/** * 统一返回结果处理 * @param <T> * @return */ public static <T> FlowableTransformer<GankHttpResponse<T>, T> handleResult() { //compose判断结果 return new FlowableTransformer<GankHttpResponse<T>, T>() { @Override public Flowable<T> apply(Flowable<GankHttpResponse<T>> httpResponseFlowable) { return httpResponseFlowable.flatMap(new Function<GankHttpResponse<T>, Flowable<T>>() { @Override public Flowable<T> apply(GankHttpResponse<T> tGankHttpResponse) { if(!tGankHttpResponse.getError()) { return createData(tGankHttpResponse.getResults()); } else { return Flowable.error(new ApiException("服务器返回error")); } } }); } }; }
/** * 统一返回结果处理 * @param <T> * @return */ public static <T> FlowableTransformer<WXHttpResponse<T>, T> handleWXResult() { //compose判断结果 return new FlowableTransformer<WXHttpResponse<T>, T>() { @Override public Flowable<T> apply(Flowable<WXHttpResponse<T>> httpResponseFlowable) { return httpResponseFlowable.flatMap(new Function<WXHttpResponse<T>, Flowable<T>>() { @Override public Flowable<T> apply(WXHttpResponse<T> tWXHttpResponse) { if(tWXHttpResponse.getCode() == 200) { return createData(tWXHttpResponse.getNewslist()); } else { return Flowable.error(new ApiException("服务器返回error")); } } }); } }; }
/** * 统一返回结果处理 * @param <T> * @return */ public static <T> FlowableTransformer<MyHttpResponse<T>, T> handleMyResult() { //compose判断结果 return new FlowableTransformer<MyHttpResponse<T>, T>() { @Override public Flowable<T> apply(Flowable<MyHttpResponse<T>> httpResponseFlowable) { return httpResponseFlowable.flatMap(new Function<MyHttpResponse<T>, Flowable<T>>() { @Override public Flowable<T> apply(MyHttpResponse<T> tMyHttpResponse) { if(tMyHttpResponse.getCode() == 200) { return createData(tMyHttpResponse.getData()); } else { return Flowable.error(new ApiException("服务器返回error")); } } }); } }; }
/** * 统一返回结果处理 * @param <T> * @return */ public static <T> FlowableTransformer<GoldHttpResponse<T>, T> handleGoldResult() { //compose判断结果 return new FlowableTransformer<GoldHttpResponse<T>, T>() { @Override public Flowable<T> apply(Flowable<GoldHttpResponse<T>> httpResponseFlowable) { return httpResponseFlowable.flatMap(new Function<GoldHttpResponse<T>, Flowable<T>>() { @Override public Flowable<T> apply(GoldHttpResponse<T> tGoldHttpResponse) { if(tGoldHttpResponse.getResults() != null) { return createData(tGoldHttpResponse.getResults()); } else { return Flowable.error(new ApiException("服务器返回error")); } } }); } }; }
/** * 统一返回结果处理 * @param <T> * @return */ public static <T> FlowableTransformer<WXHttpResponse<T>, T> handleWXResult() { //compose判断结果 return new FlowableTransformer<WXHttpResponse<T>, T>() { @Override public Flowable<T> apply(Flowable<WXHttpResponse<T>> httpResponseFlowable) { return httpResponseFlowable.flatMap(new Function<WXHttpResponse<T>, Flowable<T>>() { @Override public Flowable<T> apply(WXHttpResponse<T> tWXHttpResponse) { if(tWXHttpResponse.getCode() == 200) { return createData(tWXHttpResponse.getNewslist()); } else { return Flowable.error(new ApiException(tWXHttpResponse.getMsg(), tWXHttpResponse.getCode())); } } }); } }; }
/** * 统一返回结果处理 * @param <T> * @return */ public static <T> FlowableTransformer<MyHttpResponse<T>, T> handleMyResult() { //compose判断结果 return new FlowableTransformer<MyHttpResponse<T>, T>() { @Override public Flowable<T> apply(Flowable<MyHttpResponse<T>> httpResponseFlowable) { return httpResponseFlowable.flatMap(new Function<MyHttpResponse<T>, Flowable<T>>() { @Override public Flowable<T> apply(MyHttpResponse<T> tMyHttpResponse) { if(tMyHttpResponse.getCode() == 200) { return createData(tMyHttpResponse.getData()); } else { return Flowable.error(new ApiException(tMyHttpResponse.getMessage(), tMyHttpResponse.getCode())); } } }); } }; }
/** * Creates transform operator, which logs defined events in observable's lifecycle * @param msg message * @param bitMask bitmask of events which you want to log * @param <T> type * @return transformer */ public static <T> FlowableTransformer<T, T> logFlowable(final String msg, final int bitMask) { return upstream -> { if ((bitMask & LOG_SUBSCRIBE) > 0) { upstream = upstream.compose(fLogSubscribe(msg)); } if ((bitMask & LOG_TERMINATE) > 0) { upstream = upstream.compose(fLogTerminate(msg)); } if ((bitMask & LOG_ERROR) > 0) { upstream = upstream.compose(fLogError(msg)); } if ((bitMask & LOG_COMPLETE) > 0) { upstream = upstream.compose(fLogComplete(msg)); } if ((bitMask & LOG_NEXT_DATA) > 0) { upstream = upstream.compose(fLogNext(msg)); } else if ((bitMask & LOG_NEXT_EVENT) > 0) { upstream = upstream.compose(fLogNextEvent(msg)); } return upstream; }; }
/** * 异常处理变换 * * @return */ public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.flatMap(new Function<T, Publisher<T>>() { @Override public Publisher<T> apply(T model) throws Exception { if (model == null || model.isNull()) { return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError)); } else if (model.isAuthError()) { return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError)); } else if (model.isBizError()) { return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError)); } else { return Flowable.just(model); } } }); } }; }
public <R> Disposable subscribe(Consumer<R> onNext, Consumer<Throwable> onError, Action onCompleted, FlowableTransformer<T, R> transformer) { Flowable flowable = build(false); if (transformer != null) flowable = flowable.compose(transformer); if (onNext == null) onNext = data -> {}; if (onError == null) onError = error -> { throw new OnErrorNotImplementedException(error); }; if (onCompleted == null) onCompleted = () -> {}; Consumer<R> actualOnNext = onNext; if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled) actualOnNext = RxBusUtil.wrapQueueConsumer(onNext, mQueuer); flowable = applySchedular(flowable); Disposable disposable = flowable.subscribe(actualOnNext, onError, onCompleted); if (mBoundObject != null) RxDisposableManager.addDisposable(mBoundObject, disposable); return disposable; }
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer) { Flowable flowable = build(false); if (transformer != null) flowable = flowable.compose(transformer); Subscriber<R> actualSubscriber = subscriber; if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled) actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer); flowable = applySchedular(flowable); Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber); if (mBoundObject != null) RxDisposableManager.addDisposable(mBoundObject, disposable); return disposable; }
public static <T> FlowableTransformer<T, T> applySchedules(final IView view) { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Subscription>() { @Override public void accept(Subscription subscription) throws Exception { view.showLoading(); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnTerminate(new Action() { @Override public void run() throws Exception { view.hideLoading(); } }); } }; }
public static <R extends Result> FlowableTransformer<R, R> forFlowable() { return upstream -> upstream.onErrorResumeNext(throwable -> { if(throwable instanceof StatusException) { StatusException statusException = (StatusException) throwable; if(statusException.getStatus().hasResolution()) { return Flowable.just((R) statusException.getResult()); } else { return Flowable.error(throwable); } } else { return Flowable.error(throwable); } }); }
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats( final Function<? super T, ? extends R> function) { return new FlowableTransformer<T, Pair<T, Statistics>>() { @Override public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) { return source.scan(Pair.create((T) null, Statistics.create()), new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() { @Override public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception { return Pair.create(t, pair.b().add(function.apply(t))); } }).skip(1); } }; }
public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest, final boolean constrainFirstRequestMin) { Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest"); return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> source) { if (minRequest == maxRequest && constrainFirstRequestMin) { return source.rebatchRequests(minRequest); } else { return source .compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest)) .compose(Transformers.<T>maxRequest(maxRequest)); } } }; }
@Test public void testPassThroughWithCustomCompletion() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(PASS_THROUGH_TRANSITION) // .completion(new Completion2<String, Integer>() { @Override public void accept(String state, Emitter<Integer> emitter) { emitter.onComplete_(); } }) // .requestBatchSize(1) // .build(); Flowable.just(1, 2, 3, 4, 5, 6) // .compose(sm) // .test() // .assertValues(1, 2, 3, 4, 5, 6) // .assertComplete(); }
@Test public void testPassThroughEmitterCompletesTwice() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(PASS_THROUGH_TRANSITION) // .completion(new Completion2<String, Integer>() { @Override public void accept(String state, Emitter<Integer> emitter) { emitter.onComplete_(); emitter.onComplete_(); } }) // .requestBatchSize(1) // .build(); Flowable.just(1, 2, 3, 4, 5, 6) // .compose(sm) // .test() // .assertValues(1, 2, 3, 4, 5, 6) // .assertComplete(); }
@Test public void testPassThroughEmitterOnNextAfterCompletion() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(PASS_THROUGH_TRANSITION) // .completion(new Completion2<String, Integer>() { @Override public void accept(String state, Emitter<Integer> emitter) { emitter.onComplete_(); emitter.onNext_(8); } }) // .requestBatchSize(1) // .build(); Flowable.just(1, 2, 3, 4, 5, 6) // .compose(sm) // .test() // .assertValues(1, 2, 3, 4, 5, 6) // .assertComplete(); }
@Test public void testCompletionThrows() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(PASS_THROUGH_TRANSITION) // .completion(new Completion2<String, Integer>() { @Override public void accept(String state, Emitter<Integer> emitter) { throw new ThrowingException(); } }) // .requestBatchSize(1) // .build(); Flowable.just(1) // .compose(sm) // .test() // .assertValues(1) // .assertError(ThrowingException.class); }
@Test public void testStateFactoryReturnsNull() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialStateFactory(new Callable<String>() { @Override public String call() throws Exception { return null; } }) // .transition(PASS_THROUGH_TRANSITION) // .build(); Flowable.just(1) // .compose(sm) // .test() // .assertNoValues() // .assertError(NullPointerException.class); }
@Test public void testStateFactoryReturnsNullFromErrorStream() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialStateFactory(new Callable<String>() { @Override public String call() throws Exception { return null; } }) // .transition(PASS_THROUGH_TRANSITION) // .build(); Flowable.<Integer> error(new ThrowingException()) // .compose(sm) // .test() // .assertNoValues() // .assertError(NullPointerException.class); }
@Test public void testOnNextThrowsWithBurstSource() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(new Transition2<String, Integer, Integer>() { @Override public String apply(String state, Integer value, Emitter<Integer> emitter) { throw new ThrowingException(); } }) // .requestBatchSize(10) // .build(); Burst.items(1, 2, 3).create() // .compose(sm) // .test() // .assertNoValues() // .assertError(ThrowingException.class); }
@Test public void testCancelFromTransition() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(new Transition2<String, Integer, Integer>() { @Override public String apply(String state, Integer value, Emitter<Integer> emitter) { emitter.cancel_(); return state; } }) // .requestBatchSize(10) // .build(); Burst.items(1, 2, 3).create() // .compose(sm) // .test() // .assertNoValues() // .assertNotTerminated(); }
@Test public void testOnNextThrowsWithBurstSourceThatTerminatesWithError() { List<Throwable> list = new CopyOnWriteArrayList<Throwable>(); try { RxJavaPlugins.setErrorHandler(Consumers.addTo(list)); FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(new Transition2<String, Integer, Integer>() { @Override public String apply(String state, Integer value, Emitter<Integer> emitter) { throw new ThrowingException(); } }) // .requestBatchSize(10) // .build(); Burst.item(1).error(new RuntimeException()) // .compose(sm) // .test() // .assertNoValues() // .assertError(ThrowingException.class); assertEquals(1, list.size()); } finally { RxJavaPlugins.reset(); } }
@Test public void testStateFactoryReturnsNullOnEmptySource() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialStateFactory(new Callable<String>() { @Override public String call() throws Exception { return null; } }) // .transition(PASS_THROUGH_TRANSITION) // .build(); Flowable.<Integer> empty() // .compose(sm) // .test() // .assertNoValues() // .assertError(NullPointerException.class); }
@Test public void errorActionThrows() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(PASS_THROUGH_TRANSITION) // .errored(new Errored<String, Integer>() { @Override public void accept(String state, Throwable error, Emitter<Integer> emitter) { throw new ThrowingException(); } }) // .build(); Flowable.<Integer> error(new RuntimeException()) // .compose(sm) // .test() // .assertNoValues() // .assertError(ThrowingException.class); }
@Test public void errorActionPassThrough() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(PASS_THROUGH_TRANSITION) // .errored(new Errored<String, Integer>() { @Override public void accept(String state, Throwable error, Emitter<Integer> emitter) { emitter.onError_(error); } }) // .build(); Flowable.<Integer> error(new ThrowingException()) // .compose(sm) // .test() // .assertNoValues() // .assertError(ThrowingException.class); }
@Test public void noActionTransition() { FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() // .initialState("") // .transition(new Transition2<String, Integer, Integer>() { @Override public String apply(String state, Integer value, Emitter<Integer> emitter) { return state; } }) // .build(); Flowable.just(1, 2) // .compose(sm) // .test() // .assertNoValues() // .assertComplete(); }
/** * Allow a {@link Flowable} to be restarted (repeated) via {@link Flowable#repeatWhen} using * a fixed delay for a configurable number of restarts. After the delay, the original observer * will be repeated unless the stopRepeatingPredicate function, which is called via {@link * Flowable#takeUntil}, short circuits the attempt. * * @param stopRestartingPredicate decide if the repeat should be applied or the observer we're * composing with should onComplete * @param restartDelay how long to wait between repeats * @param restartDelayUnit the time unit for repeats * @param maxRestarts the maximum number of repeats * @param <T> the type we're composing with ({@link FlowableTransformer}'s upstream and downstream * are the same type) * @return an {@link FlowableTransformer} that can be given to {@link Flowable#compose} */ <T> FlowableTransformer<T, T> repeatWhenWithDelayAndUntil( Predicate<Long> stopRestartingPredicate, long restartDelay, TimeUnit restartDelayUnit, int maxRestarts ) { return upstream -> upstream.repeatWhen( flowable -> flowable.zipWith( Flowable.range(1, maxRestarts), (obj, count) -> count ).flatMap( attemptCount -> { if(logger.isDebugEnabled()) { logger.debug("stream_repeater_delay delay={} {}, restarts={} max_restarts={}", restartDelay, restartDelayUnit.toString().toLowerCase(), attemptCount, maxRestarts); } return Flowable.timer(restartDelay, restartDelayUnit); } ).takeUntil( stopRestartingPredicate )); }
private FlowableTransformer<String, InetAddress> resolveNtpPoolToIpAddresses() { return new FlowableTransformer<String, InetAddress>() { @Override public Publisher<InetAddress> apply(Flowable<String> ntpPoolFlowable) { return ntpPoolFlowable .observeOn(Schedulers.io()) .flatMap(new Function<String, Flowable<InetAddress>>() { @Override public Flowable<InetAddress> apply(String ntpPoolAddress) { try { TrueLog.d(TAG, "---- resolving ntpHost : " + ntpPoolAddress); return Flowable.fromArray(InetAddress.getAllByName(ntpPoolAddress)); } catch (UnknownHostException e) { return Flowable.error(e); } } }); } }; }
/** * 登录注册状态校验 * * @param code * @return */ public static FlowableTransformer<List<LoginInfo>, LoginInfo> rxStateCheck(final int code) { return new FlowableTransformer<List<LoginInfo>, LoginInfo>() { @Override public Publisher<LoginInfo> apply(Flowable<List<LoginInfo>> upstream) { return upstream .map(new Function<List<LoginInfo>, LoginInfo>() { @Override public LoginInfo apply(List<LoginInfo> loginInfos) throws Exception { int serverCode = loginInfos.get(0).getCode(); if (serverCode != code) { throw new LoginException(serverCode); } return loginInfos.get(0); } }); } }; }
private <T> FlowableTransformer<T, T> autoClose(final QueryResultIterable<T> iterable) { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.doOnTerminate(new Action() { @Override public void run() throws Exception { // Stream terminates (completed or on error): close the cursor iterable.close(); } }).doOnCancel(new Action() { @Override public void run() throws Exception { // Cancelled subscription (manual unsubscribe or via some operator such as take()): close the cursor iterable.close(); } }); } }; }
public static <T> FlowableTransformer<T, ImmutableList<T>> toImmutableList() { return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add) .map(ImmutableList.Builder::build) .toFlowable(); // must turn Single into Flowable }
public <T> FlowableTransformer<T, T> applyFlowableMainThread() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> flowable) { return flowable.observeOn(AndroidSchedulers.mainThread()); } }; }
public <T> FlowableTransformer<T, T> applyFlowableAsysnc() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> flowable) { return flowable.observeOn(AndroidSchedulers.mainThread()); } }; }
public <T> FlowableTransformer<T, T> applyFlowableCompute() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> flowable) { return flowable.observeOn(AndroidSchedulers.mainThread()); } }; }
/** * 统一线程处理 * @param <T> * @return */ public static <T> FlowableTransformer<T, T> rxSchedulerHelper() { //compose简化线程 return new FlowableTransformer<T, T>() { @Override public Flowable<T> apply(Flowable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
public static <U> FlowableTransformer<U, U> retry2(final String hint, final int retryCount) { return new FlowableTransformer<U, U>() { @Override public Publisher<U> apply(Flowable<U> upstream) { return upstream.retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { return retry(hint, retryCount, integer, throwable); } }); } }; }