/** * The returned Single emits one Query result as a List. */ public static <T> Single<List<T>> single(final Query<T> query) { return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { final DataSubscription dataSubscription = query.subscribe().single().observer(new DataObserver<List<T>>() { @Override public void onData(List<T> data) { if (!emitter.isDisposed()) { emitter.onSuccess(data); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); } }); }
private void single() { Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(SingleEmitter<String> e) throws Exception { e.onSuccess("1"); e.onSuccess("2"); e.onSuccess("3"); e.onSuccess("4"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, "accept: success " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { } }); }
public Session(Process process, Disposable disposable) { this.process = process; this.disposable = disposable; this.destroy = Completable .create(e -> { disposable.dispose(); e.onComplete(); }) .subscribeOn(Schedulers.io()) .doOnComplete(() -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("destroy():doOnComplete");}) .doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "destroy():doOnError");}) .cache(); this.waitFor = Single .create((SingleOnSubscribe<Integer>) e -> { if (RXSDebug.isDebug()) Timber.tag(TAG).d("Waiting for %s to exit.", process); int exitCode = process.waitFor(); if (RXSDebug.isDebug()) Timber.tag(TAG).d("Exitcode: %d, Process: %s", exitCode, process); e.onSuccess(exitCode); }) .subscribeOn(Schedulers.io()) .doOnSuccess(s -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("waitFor():doOnSuccess %s", s);}) .doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "waitFor():doOnError");}) .cache(); }
public Single<Cmd.Result> submit(Cmd cmd) { return Single.create((SingleOnSubscribe<Cmd.Result>) emitter -> { QueueCmd item = new QueueCmd(cmd, emitter); synchronized (CmdProcessor.this) { if (dead) { if (RXSDebug.isDebug()) Timber.tag(TAG).w("Processor wasn't running: %s", cmd); item.exitCode(Cmd.ExitCode.SHELL_DIED); item.emit(); } else { if (RXSDebug.isDebug()) Timber.tag(TAG).d("Submitted: %s", cmd); cmdQueue.add(item); } } }).doOnSuccess(item -> { if (RXSDebug.isDebug()) { Timber.tag(TAG).log(item.getErrors() != null && item.getErrors().size() > 0 ? Log.WARN : Log.INFO, "Processed: %s", item); } }); }
public void saveSnapshotToStorage(final Bitmap bitmap, final String title) { final int quality = SPUtils.getBoolean(Constants.SP_HD_SCREENSHOT) ? 100 : 90; mComposite.add(Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(@NonNull SingleEmitter<String> e) throws Exception { String address = IOUtils.saveBitmapToExternalStorage(bitmap, title, quality); e.onSuccess(address); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { mBaseView.onSnapshotSavedSucceeded(s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { mBaseView.onSnapshotSavedFailed(throwable); } })); }
public Single<List<CList>> getList(final int mFilterType, final String orderBy, final String limit, final String skip) { return Single.create(new SingleOnSubscribe<List<CList>>() { @Override public void subscribe(SingleEmitter<List<CList>> emitter) throws Exception { Cursor fetchCursor = getCursorByType(mFilterType, orderBy); if (fetchCursor == null) { emitter.onError(new Exception("Cursor is null")); return; } Log.d(TAG, "|Start Filter Type = " + mFilterType + " " + new Date(System.currentTimeMillis()).toString() + "\n Cursor Count" + fetchCursor.getCount()); emitter.onSuccess(new ArrayList<>(fillMap(fetchCursor, mFilterType).values())); } }); }
/************************************************************/ public Single<Void> deleteCollBookInRx(CollBookBean bean) { return Single.create(new SingleOnSubscribe<Void>() { @Override public void subscribe(SingleEmitter<Void> e) throws Exception { //查看文本中是否存在删除的数据 deleteBook(bean.get_id()); //删除任务 deleteDownloadTask(bean.get_id()); //删除目录 deleteBookChapter(bean.get_id()); //删除CollBook mCollBookDao.delete(bean); e.onSuccess(new Void()); } }); }
public Single<T> toSingle() { return Single.create(new SingleOnSubscribe<T>() { @Override public void subscribe(final @NonNull SingleEmitter<T> e) throws Exception { enqueue(new Callback<T>() { @Override public void onResponse(@NonNull T data) { e.onSuccess(data); } } , new ErrorCallback() { @Override public void onError(@NonNull Throwable error) { e.onError(error); } } ); } }); }
private Single<Integer> rename(RecordingItem recordingItem, int adapterPosition, String name) { return Single.create((SingleOnSubscribe<Integer>) e -> { File newFile = new File( Environment.getExternalStorageDirectory().getAbsolutePath() + "/SoundRecorder/" + name); if (newFile.exists() && !newFile.isDirectory()) { e.onError(new Exception("File with same name already exists")); } else { File oldFilePath = new File(recordingItem.getFilePath()); if (oldFilePath.renameTo(newFile)) { recordingItem.setName(name); recordingItem.setFilePath(newFile.getPath()); recordItemDataSource.updateRecordItem(recordingItem); e.onSuccess(adapterPosition); } else { e.onError(new Throwable("Cannot Rename file. Please try again")); } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); }
public static <T> Single<T> single(final OnSubscribeAction<T> subscribe) { final RxActionDelegate<T> delegate = new RxActionDelegate<>(); return Single.<T>create(new SingleOnSubscribe<T>() { @Override public void subscribe(@NonNull final SingleEmitter<T> emitter) throws Exception { delegate.setDelegate(new ActionDelegate<T>() { @Override public void onSuccess(T result) { emitter.onSuccess(result); } @Override public void onError(Exception e) { emitter.onError(e); } }); subscribe.subscribe(delegate); } }).doOnDispose(new Action() { @Override public void run() throws Exception { delegate.cancel(); } }); }
public static <T> Single<Result<T>> singleWrapped(final OnSubscribeAction<T> subscribe) { final RxActionDelegate<T> delegate = new RxActionDelegate<>(); return Single.<Result<T>>create(new SingleOnSubscribe<Result<T>>() { @Override public void subscribe(@NonNull final SingleEmitter<Result<T>> emitter) throws Exception { delegate.setDelegate(new ActionDelegate<T>() { @Override public void onSuccess(T result) { emitter.onSuccess(new Result<T>(result)); } @Override public void onError(Exception e) { emitter.onSuccess(new Result<T>(e)); } }); subscribe.subscribe(delegate); } }).doOnDispose(new Action() { @Override public void run() throws Exception { delegate.cancel(); } }); }
/** * Given an operation that takes a {@link MaybeConsumer<T>}, create a JavaRX * {@link Single<T>} that produces the value passed to the MaybeConsumer. * * Example: * <pre> * // log the name of the experiment with a given id * DataController dc = getDataController(); * MaybeConsumers.buildSingle(mc -> dc.getExperimentById(id, mc)) * .subscribe(experiment -> log("Name: " + experiment.getName())); * </pre> */ public static <T> Single<T> buildSingle(io.reactivex.functions.Consumer<MaybeConsumer<T>> c) { return Single.create(new SingleOnSubscribe<T>() { @Override public void subscribe(SingleEmitter<T> emitter) throws Exception { c.accept(new MaybeConsumer<T>() { @Override public void success(T value) { emitter.onSuccess(value); } @Override public void fail(Exception e) { emitter.onError(e); } }); } }); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() { @Override public void subscribe(SingleEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
private Single<Ignore> connectService() { return Single.create(new SingleOnSubscribe<Ignore>() { @Override public void subscribe(final SingleEmitter<Ignore> emitter) throws Exception { if (serviceConnection == null) { serviceConnection = new ServiceConnection() { @Override public void onServiceDisconnected(ComponentName name) { rxBillingServiceLogger.log(getTargetClassName(), "Service disconnected"); appBillingService = null; emitter.onError( new RxBillingServiceException(RxBillingServiceError.SERVICE_DISCONNECTED)); } @Override public void onServiceConnected(ComponentName name, final IBinder service) { rxBillingServiceLogger.log(getTargetClassName(), "Service connected"); appBillingService = IInAppBillingService.Stub.asInterface(service); emitter.onSuccess(Ignore.Get); } }; bindService(); } else { emitter.onSuccess(Ignore.Get); } } }); }
/** * @param query * @return */ @NonNull @CheckReturnValue public static Single<DataSnapshot> single(@NonNull final Query query) { return Single.create(new SingleOnSubscribe<DataSnapshot>() { @Override public void subscribe( @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception { final ValueEventListener listener = listener(emit); emit.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { query.removeEventListener(listener); } }); query.addListenerForSingleValueEvent(listener); } }); }
@Override @NonNull public Single<List<T>> get() { return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { runInReadLock(readWriteLock, new ThrowingRunnable() { @Override public void run() throws Exception { if (!file.exists()) { emitter.onSuccess(Collections.<T>emptyList()); return; } List<T> list = converter.read(file, type); if (list == null) list = Collections.emptyList(); emitter.onSuccess(list); } }); } }); }
@Override @NonNull public Single<List<T>> observePut(@NonNull final List<T> list) { assertNotNull(list, "list"); return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { runInWriteLock(readWriteLock, new ThrowingRunnable() { @Override public void run() throws Exception { if (!file.exists() && !file.createNewFile()) { throw new IOException("Could not create file for store."); } converterWrite(list, converter, type, file); emitter.onSuccess(list); updateSubject.onNext(list); } }); } }); }
@Override @NonNull public Single<List<T>> observeClear() { return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { runInWriteLock(readWriteLock, new ThrowingRunnable() { @Override public void run() throws Exception { if (file.exists() && !file.delete()) { throw new IOException("Clear operation on store failed."); } emitter.onSuccess(Collections.<T>emptyList()); updateSubject.onNext(Collections.<T>emptyList()); } }); } }); }
@Override @NonNull public Single<List<T>> observeAdd(@NonNull final T value) { assertNotNull(value, "value"); return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { runInWriteLock(readWriteLock, new ThrowingRunnable() { @Override public void run() throws Exception { if (!file.exists() && !file.createNewFile()) { throw new IOException("Could not create file for store."); } List<T> originalList = converter.read(file, type); if (originalList == null) originalList = Collections.emptyList(); List<T> result = new ArrayList<T>(originalList.size() + 1); result.addAll(originalList); result.add(value); converterWrite(result, converter, type, file); emitter.onSuccess(result); updateSubject.onNext(result); } }); } }); }
@Override @NonNull public Single<List<T>> observeRemove(final int position) { return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { runInWriteLock(readWriteLock, new ThrowingRunnable() { @Override public void run() throws Exception { List<T> originalList = converter.read(file, type); if (originalList == null) originalList = Collections.emptyList(); List<T> modifiedList = new ArrayList<T>(originalList); modifiedList.remove(position); converterWrite(modifiedList, converter, type, file); emitter.onSuccess(modifiedList); updateSubject.onNext(modifiedList); } }); } }); }
@Override @NonNull public Single<T> observePut(@NonNull final T value) { assertNotNull(value, "value"); return Single.create(new SingleOnSubscribe<T>() { @Override public void subscribe(final SingleEmitter<T> emitter) throws Exception { runInWriteLock(readWriteLock, new ThrowingRunnable() { @Override public void run() throws Exception { if (!file.exists() && !file.createNewFile()) { throw new IOException("Could not create file for store."); } converterWrite(value, converter, type, file); emitter.onSuccess(value); updateSubject.onNext(new ValueUpdate<T>(value)); } }); } }); }
@Override public void onJourneySearchButtonClick(String departureStationName, String arrivalStationName) { if (isDataValid()) { view.onValidJourneySearchParameters(); Log.d("Searching for: " + departureStation.toString(), arrivalStation.toString()); Single.create((SingleOnSubscribe<Boolean>) e -> e.onSuccess(saveRecentJourney())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); } }
public static Single<Submission> getSubmission(final RedditClient redditClient) { return Single.create(new SingleOnSubscribe<Submission>() { @Override public void subscribe(SingleEmitter<Submission> e) throws Exception { try { e.onSuccess(redditClient.getRandomSubmission()); } catch (Exception ex) { e.onError(ex); } } }); }
protected @OnClick(R.id.tv_save_image) void saveImage() { BitmapPool pool = BitmapPool.getPool(); final String fileName = MD5.generate(mUrl); BitmapWrapper bitmapWrapper = pool.get(fileName, true, true); final Bitmap bitmap = bitmapWrapper.getBitmap(); mDisposable = Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(@NonNull SingleEmitter<String> e) throws Exception { String address = IOUtils.saveBitmapToExternalStorage(bitmap, fileName, 100); e.onSuccess(address); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { ToastUtils.showShortToast(getString(R.string.image_saved) + IOUtils.PIC_DIR); MediaScannerConnection.scanFile(Aequorea.getApp() .getApplicationContext(), new String[]{Environment.getExternalStorageDirectory() .getPath().concat(IOUtils.PIC_DIR)}, null, new MediaScannerConnection.OnScanCompletedListener() { @Override public void onScanCompleted(final String path, Uri uri) { // do nothing. } }); dismiss(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { ToastUtils.showShortToast(getString(R.string.save_failed) + throwable.getMessage()); dismiss(); } }); }
@Override public Single<Boolean> checkInternetConnectivity(final String host, final int port, final int timeoutInMs, final ErrorHandler errorHandler) { checkGeneralPreconditions(host, port, timeoutInMs, errorHandler); return Single.create(new SingleOnSubscribe<Boolean>() { @Override public void subscribe(@NonNull SingleEmitter<Boolean> emitter) throws Exception { emitter.onSuccess(isConnected(host, port, timeoutInMs, errorHandler)); } }); }
/** * Run a transaction on the data at this location. For more information on running transactions, see * * @param ref reference represents a particular location in your database. * @param fireLocalEvents boolean which allow to receive calls of your transaction in your local device. * @param transactionValue value of the transaction. * @return a {@link Single} which emits the final {@link DataSnapshot} value if the transaction success. */ @NonNull public static Single<DataSnapshot> runTransaction(@NonNull final DatabaseReference ref, @NonNull final boolean fireLocalEvents, @NonNull final long transactionValue) { return Single.create(new SingleOnSubscribe<DataSnapshot>() { @Override public void subscribe(final SingleEmitter emitter) throws Exception { ref.runTransaction(new Transaction.Handler() { @Override public Transaction.Result doTransaction(MutableData mutableData) { Integer currentValue = mutableData.getValue(Integer.class); if (currentValue == null) { mutableData.setValue(transactionValue); } else { mutableData.setValue(currentValue + transactionValue); } return Transaction.success(mutableData); } @Override public void onComplete(DatabaseError databaseError, boolean b, DataSnapshot dataSnapshot) { if (databaseError != null) { emitter.onError(new RxFirebaseDataException(databaseError)); } else { emitter.onSuccess(dataSnapshot); } } }, fireLocalEvents); } }); }
public static Single<List<File>> getSDTxtFile(){ //外部存储卡路径 String rootPath = Environment.getExternalStorageDirectory().getPath(); return Single.create(new SingleOnSubscribe<List<File>>() { @Override public void subscribe(SingleEmitter<List<File>> e) throws Exception { List<File> files = getTxtFiles(rootPath,0); e.onSuccess(files); } }); }
private <T> Single<List<T>> queryToRx(QueryBuilder<T> builder){ return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(SingleEmitter<List<T>> e) throws Exception { List<T> data = builder.list(); if (data == null){ data = new ArrayList<T>(1); } e.onSuccess(data); } }); }
public Single<List<BookChapterBean>> getBookChaptersInRx(String bookId){ return Single.create(new SingleOnSubscribe<List<BookChapterBean>>() { @Override public void subscribe(SingleEmitter<List<BookChapterBean>> e) throws Exception { List<BookChapterBean> beans = mSession .getBookChapterBeanDao() .queryBuilder() .where(BookChapterBeanDao.Properties.BookId.eq(bookId)) .list(); e.onSuccess(beans); } }); }
private Single<Integer> removeFile(RecordingItem recordingItem, int position) { return Single.create((SingleOnSubscribe<Integer>) e -> { File file = new File(recordingItem.getFilePath()); if (file.delete()) { recordItemDataSource.deleteRecordItem(recordingItem); recordingItems.remove(position); e.onSuccess(position); } else { e.onError(new Exception("File deletion failed")); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); }
private void doWrite(Realm.Transaction transaction) { Single.create((SingleOnSubscribe<Void>) singleSubscriber -> { try(Realm r = Realm.getDefaultInstance()) { r.executeTransaction(transaction); } }).subscribeOn(writeScheduler.getScheduler()).subscribe(); }
/** * Gets airplane mode state wrapped within a Single type * * @param context of the Application or Activity * @return RxJava2 Single with Boolean value indicating state of the airplane mode */ public Single<Boolean> get(final Context context) { checkContextIsNotNull(context); return Single.create(new SingleOnSubscribe<Boolean>() { @Override public void subscribe(@NonNull SingleEmitter<Boolean> emitter) throws Exception { emitter.onSuccess(isAirplaneModeOn(context)); } }); }
/** * @param task * @param <R> * @return */ @CheckReturnValue @NonNull public static <R> Single<R> single(@NonNull final Task<R> task) { return Single.create(new SingleOnSubscribe<R>() { @Override public void subscribe(@NonNull final SingleEmitter<R> emit) throws Exception { task.addOnCompleteListener(listener(emit)); } }); }
/** * Recherche */ public Single<AllocineResponse> search(final String recherche, final List<String> filter, final int count, final int page) { return Single .create(new SingleOnSubscribe<Pair<String, String>>() { @Override public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception { final String params = ServiceSecurity.construireParams(false, AllocineService.Q, "" + recherche.replace(" ", "+"), AllocineService.FILTER, filter, AllocineService.COUNT, "" + count, AllocineService.PAGE, "" + page ); final String sed = ServiceSecurity.getSED(); final String sig = ServiceSecurity.getSIG(params, sed); e.onSuccess(Pair.create(sed, sig)); } }) .flatMap(new Function<Pair<String, String>, SingleSource<? extends AllocineResponse>>() { @Override public SingleSource<? extends AllocineResponse> apply(Pair<String, String> pair) throws Exception { return allocineService.search(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second); } }) .compose(this.<AllocineResponse>retry()); }
/** * Recherche */ public Single<AllocineResponseSmall> searchSmall(final String recherche, final List<String> filter, final int count, final int page) { return Single .create(new SingleOnSubscribe<Pair<String, String>>() { @Override public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception { final String params = ServiceSecurity.construireParams(false, AllocineService.Q, "" + recherche.replace(" ", "+"), AllocineService.FILTER, filter, AllocineService.COUNT, "" + count, AllocineService.PAGE, "" + page ); final String sed = ServiceSecurity.getSED(); final String sig = ServiceSecurity.getSIG(params, sed); e.onSuccess(Pair.create(sed, sig)); } }) .flatMap(new Function<Pair<String, String>, SingleSource<AllocineResponseSmall>>() { @Override public SingleSource<AllocineResponseSmall> apply(Pair<String, String> pair) throws Exception { return allocineService.searchSmall(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second); } }) .compose(this.<AllocineResponseSmall>retry()); }
/** * Informations sur un film */ public Single<Movie> movie(final String idFilm, final Profile profile) { final String filter = FILTER_MOVIE; return Single .create(new SingleOnSubscribe<Pair<String, String>>() { @Override public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception { final String params = ServiceSecurity.construireParams(false, AllocineService.CODE, idFilm, AllocineService.PROFILE, profile.getValue(), AllocineService.FILTER, filter ); final String sed = ServiceSecurity.getSED(); final String sig = ServiceSecurity.getSIG(params, sed); e.onSuccess(Pair.create(sed, sig)); } }) .flatMap(new Function<Pair<String, String>, SingleSource<? extends Movie>>() { @Override public SingleSource<? extends Movie> apply(Pair<String, String> pair) throws Exception { return allocineService.movie(idFilm, profile.getValue(), filter, pair.first, pair.second) .map(new Function<AllocineResponse, Movie>() { @Override public Movie apply(AllocineResponse allocineResponse) throws Exception { return allocineResponse.getMovie(); } }); } }) .compose(this.<Movie>retry()); }
/** * Informations sur un film */ public Single<Theater> theater(final String idCinema, final String profile, final String filter) { return Single .create(new SingleOnSubscribe<Pair<String, String>>() { @Override public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception { final String params = ServiceSecurity.construireParams(false, AllocineService.CODE, idCinema, AllocineService.PROFILE, profile, AllocineService.FILTER, filter ); final String sed = ServiceSecurity.getSED(); final String sig = ServiceSecurity.getSIG(params, sed); e.onSuccess(Pair.create(sed, sig)); } }) .flatMap(new Function<Pair<String, String>, SingleSource<? extends Theater>>() { @Override public SingleSource<? extends Theater> apply(Pair<String, String> pair) throws Exception { return allocineService.theater(idCinema, profile, filter, pair.first, pair.second) .map(new Function<AllocineResponse, Theater>() { @Override public Theater apply(AllocineResponse allocineResponse) throws Exception { return allocineResponse.getTheater(); } }); } }) .compose(this.<Theater>retry()); }
/** * Informations sur une personne */ public Single<PersonFull> person(final String idPerson, final String profile, final String filter) { return Single .create(new SingleOnSubscribe<Pair<String, String>>() { @Override public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception { final String params = ServiceSecurity.construireParams(false, AllocineService.CODE, idPerson, AllocineService.PROFILE, profile, AllocineService.FILTER, filter ); final String sed = ServiceSecurity.getSED(); final String sig = ServiceSecurity.getSIG(params, sed); e.onSuccess(Pair.create(sed, sig)); } }) .flatMap(new Function<Pair<String, String>, SingleSource<PersonFull>>() { @Override public SingleSource<PersonFull> apply(Pair<String, String> pair) throws Exception { return allocineService.person(idPerson, profile, filter, pair.first, pair.second) .map(new Function<AllocineResponse, PersonFull>() { @Override public PersonFull apply(AllocineResponse allocineResponse) throws Exception { return allocineResponse.getPerson(); } }); } }) .compose(this.<PersonFull>retry()); }