/** * Creates transform operator, which logs defined events in observable's lifecycle * @param msg message * @param bitMask bitmask of events which you want to log * @param <T> type * @return transformer */ public static <T> SingleTransformer<T, T> logSingle(final String msg, final int bitMask) { return upstream -> { if ((bitMask & LOG_SUBSCRIBE) > 0) { upstream = upstream.compose(sLogSubscribe(msg)); } if ((bitMask & LOG_ERROR) > 0) { upstream = upstream.compose(sLogError(msg)); } if ((bitMask & LOG_NEXT_DATA) > 0) { upstream = upstream.compose(sLogSuccess(msg)); } else if ((bitMask & LOG_NEXT_EVENT) > 0) { upstream = upstream.compose(sLogSuccessEvent(msg)); } if ((bitMask & LOG_DISPOSE) > 0) { upstream = upstream.compose(sLogDispose(msg)); } return upstream; }; }
private SingleTransformer<TmpResult, TmpResult> storeToCacheAndReturn() { return single -> single // собственно, вставка .flatMap(result -> this.messagesInteractor .insertMessages(result.getAccountId(), result.collectDtos()) .andThen(refreshChangedDialogs(result)) .andThen(Single.just(result))) .flatMap(result -> { // собственно, получение из локальной базы List<Integer> ids = collectIds(result.getData(), msg -> true); return this.messagesInteractor .findCachedMessages(result.getAccountId(), ids) .map(result::appendModel); }); }
private SingleTransformer<List<PostEntity>, List<Post>> dbos2models(int accountId) { return single -> single .flatMap(dbos -> { final VKOwnIds ids = new VKOwnIds(); Entity2Model.fillOwnerIds(ids, dbos); return ownersInteractor .findBaseOwnersDataAsBundle(accountId, ids.getAll(), IOwnersInteractor.MODE_ANY) .map(owners -> { List<Post> posts = new ArrayList<>(dbos.size()); for (PostEntity dbo : dbos) { posts.add(Entity2Model.buildPostFromDbo(dbo, owners)); } return posts; }); }); }
private SingleTransformer<List<MessageEntity>, List<Message>> toMessageModels(int accountId) { return single -> single .flatMap(dbos -> { VKOwnIds ownIds = new VKOwnIds(); Entity2Model.fillOwnerIds(ownIds, dbos); return this.ownersInteractor .findBaseOwnersDataAsBundle(accountId, ownIds.getAll(), IOwnersInteractor.MODE_ANY) .map(owners -> { final List<Message> messages = new ArrayList<>(dbos.size()); for (MessageEntity dbo : dbos) { messages.add(Entity2Model.buildMessageFromDbo(accountId, dbo, owners)); } return messages; }); }); }
private SingleTransformer<List<CommentEntity>, List<Comment>> dbos2models(int accountId) { return single -> single.flatMap(dbos -> { VKOwnIds ownids = new VKOwnIds(); for (CommentEntity c : dbos) { Entity2Model.fillCommentOwnerIds(ownids, c); } return ownersInteractor .findBaseOwnersDataAsBundle(accountId, ownids.getAll(), IOwnersInteractor.MODE_ANY) .map(owners -> { List<Comment> comments = new ArrayList<>(dbos.size()); for (CommentEntity dbo : dbos) { comments.add(Entity2Model.buildCommentFromDbo(dbo, owners)); } return comments; }); }); }
public <R> SingleTransformer<? super R, ? extends R> composeSingle() { return new SingleTransformer<R, R>() { @Override public SingleSource<R> apply(@NonNull Single<R> upstream) { return upstream .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .retryWhen(new RetryWithDelay(maxRetry, todoBeforeRetry).forSingle) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { AbstractPresenter.this.addDisposable(disposable); } }); } }; }
public static <R extends Result> SingleTransformer<R, R> forSingle() { return upstream -> upstream.onErrorResumeNext(throwable -> { if(throwable instanceof StatusException) { StatusException statusException = (StatusException) throwable; if(statusException.getStatus().hasResolution()) { return Single.just((R) statusException.getResult()); } else { return Single.error(throwable); } } else { return Single.error(throwable); } }); }
/** * Filters the given list against whether their Title or Subtitle matches the filter text. * * @return Filtered list. */ public SingleTransformer<List<? extends RecyclerViewModel>, List<? extends RecyclerViewModel>> filterByFilterText() { return untransformed -> (Single) untransformed.flattenAsObservable(items -> items) .filter(item -> item.getSubtitle().toLowerCase().contains(filterText) || item.getTitle().toLowerCase().contains(filterText)) .toList(); }
/** * Filters the list to items that are listed as For Sale. * * @return Filtered list. */ public SingleTransformer<List<Listing>, List<Listing>> filterForSale() { return listingsSingle -> listingsSingle.flattenAsObservable(listings -> listings) .filter(listing -> listing.getStatus().equals("For Sale")) .toList(); }
@Provides SchedulerSingleTransformer provideSchedulerSingleTransformer() { return new SchedulerSingleTransformer() { @SuppressWarnings("unchecked") @Override public <T> SingleTransformer<T, T> transformer() { return upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
@SuppressWarnings("unchecked") @Override public <T> SingleTransformer<T, T> transformer() { return (SingleTransformer<T, T>) new SingleTransformer() { @Override public SingleSource apply(Single upstream) { return upstream.subscribeOn(Schedulers.trampoline()) .observeOn(Schedulers.trampoline()); } }; }
private SingleTransformer<TmpResult, TmpResult> getAndStore() { return single -> single .flatMap(result -> { // если в исходных данных недостаточно инфы - получаем нужные данные с api List<Integer> needGetFromNet = collectIds(result.getData(), msg -> isNull(msg.getDto())); if (needGetFromNet.isEmpty()) { return Single.just(result); } return networker.vkDefault(result.getAccountId()) .messages() .getById(needGetFromNet) .map(result::appendDtos); }) .map(result -> { // отсеиваем сообщения, которые имеют отношение к обмену ключами removeIf(result.getData(), msg -> KeyExchangeService.intercept(app, result.getAccountId(), msg.getDto())); return result; }) .flatMap(result -> { if (result.getData().isEmpty()) { return Single.just(result); } // идентифицируем доолнительные необходимые данные, которых не хватает в локальной базе // например, информация о пользователях, группах или чатах // получаем и сохраняем, если необходимо return identifyMissingObjectsGetAndStore(result) .andThen(Single.just(result)) // сохраняем сообщения в локальную базу и получаем оттуда "тяжелые" обьекты сообщений .compose(storeToCacheAndReturn()); }); }
private SingleTransformer<PostEntity, Post> dbo2model(int accountId) { return single -> single .flatMap(dbo -> { final VKOwnIds ids = new VKOwnIds(); Entity2Model.fillPostOwnerIds(ids, dbo); return ownersInteractor .findBaseOwnersDataAsBundle(accountId, ids.getAll(), IOwnersInteractor.MODE_ANY) .map(owners -> { return Entity2Model.buildPostFromDbo(dbo, owners); }); }); }
public static <T> SingleTransformer<T, T> applyCommonSchedulersSingle() { return new SingleTransformer<T, T>() { @Override public SingleSource<T> apply(@NonNull Single<T> upstream) { return upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
private <T> SingleTransformer<T, T> retry() { return new SingleTransformer<T, T>() { @Override public SingleSource<T> apply(Single<T> upstream) { return upstream.retryWhen(new Function<Flowable<Throwable>, Publisher<Object>>() { private final int MAX_COUNT = 3; private int count = 0; private final int DELAY_SECOND = 10; @Override public Publisher<Object> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (count++ < MAX_COUNT && throwable instanceof HttpException) { final HttpException httpException = (HttpException) throwable; if (httpException.code() == 403) { return Flowable.timer(DELAY_SECOND, TimeUnit.SECONDS); } } return Flowable.error(throwable); } }); } }); } }; }
public <R> SingleTransformer<? super R, ? extends R> compose() { return new SingleTransformer<R, R>() { @Override public SingleSource<R> apply(@NonNull Single<R> upstream) { return upstream .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(AbstractPresenter.this::call); } }; }
/** * Provides the Io schedule {@link Single} transformation. * Subscribes the stream to Io bound {@link Schedulers} and observes it in the {Android main thread. * * @return The stream with the schedule transformation */ @CheckResult @NonNull protected <T> SingleTransformer<T, T> applySingleIoSchedulers() { //noinspection unchecked return (SingleTransformer<T, T>) singleIoTransformer; }
public SingleUseCase(final UseCaseExecutor useCaseExecutor, final PostExecutionThread postExecutionThread) { super(useCaseExecutor, postExecutionThread); schedulersTransformer = new SingleTransformer<R, R>() { @Override public Single<R> apply(Single<R> single) { return single.subscribeOn(useCaseExecutor.getScheduler()) .observeOn(postExecutionThread.getScheduler()); } }; }
public static <T> SingleTransformer<T, T> applySingle(LifecycleOwner lifecycleOwner) { return single -> { LiveData<T> liveData = LiveDataReactiveStreams.fromPublisher(single.toFlowable()); return Flowable.fromPublisher(LiveDataReactiveStreams .toPublisher(lifecycleOwner, liveData)).singleOrError(); }; }
public <T> SingleTransformer<Response<T>, T> process() { return oResponse -> oResponse .flatMap(response -> { if (response.isSuccessful()) return Single.just(response.body()); try { String error = errorAdapter.adapt(response.errorBody().string()); return Single.error(new NetworkException(error)); } catch (java.lang.Exception exception) { return Single.error(new RuntimeException(exception)); } }); }
public <T> SingleTransformer<T, T> safely() { return single -> single .subscribeOn(backgroundThread) .<T>observeOn(mainThread) .<T>compose(lifecycle) .<T>onErrorResumeNext(error -> { if (error instanceof CancellationException) return Single.never(); return Single.error((Throwable) error); }); }
public <T> SingleTransformer<T, T> reportOnSnackBar() { return single -> single .<T>doOnError(throwable -> { Single<String> formattedError = exceptionFormatter.format(throwable); notifications.showSnackBar(formattedError); }) .<T>onErrorResumeNext(error -> Single.never()); }
public <T> SingleTransformer<T, T> reportOnToast() { return single -> single .<T>doOnError(throwable -> { Single<String> formattedError = exceptionFormatter.format(throwable); notifications.showToast(formattedError); }) .<T>onErrorResumeNext(throwable -> Single.never()); }
@Override public <U> SingleTransformer<U, U> forSingle() { return new SingleTransformer<U, U>() { @Override public SingleSource<U> apply(io.reactivex.Single<U> source) { rx.Single<U> rxSourceSingle = RxJavaInterop.toV1Single(source); rx.Single<T> rxBoundSingle = rxSourceSingle .compose((Single.Transformer<? super U, ? extends T>) rxSingleTransformer); return (io.reactivex.Single<U>) RxJavaInterop.toV2Single(rxBoundSingle); } }; }
private static <T> SingleTransformer<Response<T>, T> queryResponseDataTransformer() { return upstream -> upstream.flatMap(response -> { if (response.errors().isEmpty()) { return Single.just(response.data()); } else { String errorMessage = fold(new StringBuilder(), response.errors(), (builder, error) -> builder.append(error.message()).append("\n")).toString(); return Single.error(new RuntimeException(errorMessage)); } }); }
private static <T extends AbstractResponse<T>> SingleTransformer<GraphResponse<T>, T> queryResponseDataTransformer() { return upstream -> upstream.flatMap(response -> { if (response.errors().isEmpty()) { return Single.just(response.data()); } else { String errorMessage = fold(new StringBuilder(), response.errors(), (builder, error) -> builder.append(error.message()).append("\n")).toString(); return Single.error(new RuntimeException(errorMessage)); } }); }
public static <T> SingleTransformer<Collection<T>, Collection<T>> toUnmodifiable() { return singleObserver -> singleObserver.map(Collections::unmodifiableCollection); }
private static <T> SingleTransformer<T, T> sLogSuccess(final String msg) { return upstream -> upstream.doOnSuccess(data -> Timber.d("[onSuccess] %s %s [Thread:%s]", msg, data, Thread.currentThread().getName())); }
private static <T> SingleTransformer<T, T> sLogSuccessEvent(final String msg) { return upstream -> upstream.doOnSuccess(x -> Timber.d("[onSuccess] %s [Thread:%s]", msg, Thread.currentThread().getName())); }
private static <T> SingleTransformer<T, T> sLogError(final String msg) { final Function<Throwable, String> message = e -> e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); return upstream -> upstream.doOnError(e -> Timber.e("[onError] %s - %s", msg, message.apply(e))); }
private static <T> SingleTransformer<T, T> sLogSubscribe(final String msg) { return upstream -> upstream.doOnSubscribe(disposable -> Timber.v("[subscribe] %s [Thread:%s]", msg, Thread.currentThread().getName())); }
private static <T> SingleTransformer<T, T> sLogDispose(final String msg) { return upstream -> upstream.doOnDispose(() -> Timber.v("[dispose] %s", msg)); }
public static <T> SingleTransformer<T,T> fromNotificationThreadToMain(){ return single -> single .subscribeOn(INSTANCE) .observeOn(Injection.provideMainThreadScheduler()); }
public static <T> SingleTransformer<T, T> applySingleIOToMainSchedulers() { return upstream -> upstream .subscribeOn(Schedulers.io()) .observeOn(Injection.provideMainThreadScheduler()); }
public static <T> SingleTransformer<T, T> applySingleComputationToMainSchedulers() { return upstream -> upstream .subscribeOn(Schedulers.computation()) .observeOn(Injection.provideMainThreadScheduler()); }
public static SingleTransformer<Wallet, Wallet> savePassword( PasswordStore passwordStore, WalletRepositoryType walletRepository, String password) { return new SavePasswordOperator(passwordStore, walletRepository, password); }
@Nonnull @CheckReturnValue public static <T> SingleTransformer<T, T> delaySingle(@Nonnull Observable<Boolean> pauseLifecycle) { Preconditions.checkNotNull(pauseLifecycle, "pauseLifecycle == null"); return new DelaySingleTransformer<T>(pauseLifecycle); }
@CheckResult @NonNull @Override public <S> SingleTransformer<S, S> applySingleServiceTransformation() { return applySingleIoSchedulers(); }