/** * Creates transform operator, which logs defined events in observable's lifecycle * @param msg message * @param bitMask bitmask of events which you want to log * @return transformer */ public static CompletableTransformer logCompletable(final String msg, final int bitMask) { return upstream -> { if ((bitMask & LOG_SUBSCRIBE) > 0) { upstream = upstream.compose(cLogSubscribe(msg)); } if ((bitMask & LOG_ERROR) > 0) { upstream = upstream.compose(cLogError(msg)); } if ((bitMask & LOG_COMPLETE) > 0) { upstream = upstream.compose(cLogComplete(msg)); } if ((bitMask & LOG_DISPOSE) > 0) { upstream = upstream.compose(cLogDispose(msg)); } return upstream; }; }
public CompletableUseCase(final UseCaseExecutor useCaseExecutor, final PostExecutionThread postExecutionThread) { super(useCaseExecutor, postExecutionThread); schedulersTransformer = new CompletableTransformer() { @Override public Completable apply(Completable completable) { return completable.subscribeOn(useCaseExecutor.getScheduler()) .observeOn(postExecutionThread.getScheduler()); } }; }
private CompletableTransformer getSchedulersTransformer() { return schedulersTransformer; }
private static CompletableTransformer cLogError(final String msg) { final Function<Throwable, String> message = e -> e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); return upstream -> upstream.doOnError(e -> Timber.e("[onError] %s - %s", msg, message.apply(e))); }
private static CompletableTransformer cLogComplete(final String msg) { return upstream -> upstream.doOnComplete(() -> Timber.v("[onComplete] %s [Thread:%s]", msg, Thread.currentThread().getName())); }
private static CompletableTransformer cLogSubscribe(final String msg) { return upstream -> upstream.doOnSubscribe(disposable -> Timber.v("[subscribe] %s [Thread:%s]", msg, Thread.currentThread().getName())); }
private static CompletableTransformer cLogDispose(final String msg) { return upstream -> upstream.doOnDispose(() -> Timber.v("[dispose] %s", msg)); }
public CompletableTransformer applySchedulersToCompletable() { return completable -> completable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
public static CompletableTransformer applyCompletableIOToMainSchedulers() { return completable -> completable.subscribeOn(Schedulers.io()) .observeOn(Injection.provideMainThreadScheduler()); }
@Nonnull @CheckReturnValue public static CompletableTransformer delayCompletable(@Nonnull Observable<Boolean> pauseLifecycle) { Preconditions.checkNotNull(pauseLifecycle, "pauseLifecycle == null"); return new DelayCompletableTransformer(pauseLifecycle); }
@CheckResult @NonNull @Override public CompletableTransformer applyCompletableServiceTransformation() { return applyCompletableIoSchedulers(); }
public static CompletableTransformer applyCompletableSchedulers() { return upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
public CompletableTransformer forCompletable() { return completable -> completable .subscribeOn(executor) .observeOn(notifier); }
/** * Provides the service {@link Completable} transformation. * It could be used to sign out the user when getting a service error for example. * * @return The {@link Completable} transformation */ @CheckResult @NonNull CompletableTransformer applyCompletableServiceTransformation();
/** * Provides the Io schedule {@link Completable} transformation. * Subscribes the stream to Io bound {@link Schedulers} and observes it in the {Android main thread. * * @return The stream with the schedule transformation */ @CheckResult @NonNull protected CompletableTransformer applyCompletableIoSchedulers() { return completableIoTransformer; }