Java 类io.reactivex.FlowableTransformer 实例源码

项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<GankHttpResponse<T>, T> handleResult() {   //compose判断结果
    return new FlowableTransformer<GankHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<GankHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<GankHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(GankHttpResponse<T> tGankHttpResponse) {
                    if(!tGankHttpResponse.getError()) {
                        return createData(tGankHttpResponse.getResults());
                    } else {
                        return Flowable.error(new ApiException("服务器返回error"));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<WXHttpResponse<T>, T> handleWXResult() {   //compose判断结果
    return new FlowableTransformer<WXHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<WXHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<WXHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(WXHttpResponse<T> tWXHttpResponse) {
                    if(tWXHttpResponse.getCode() == 200) {
                        return createData(tWXHttpResponse.getNewslist());
                    } else {
                        return Flowable.error(new ApiException("服务器返回error"));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<MyHttpResponse<T>, T> handleMyResult() {   //compose判断结果
    return new FlowableTransformer<MyHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<MyHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<MyHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(MyHttpResponse<T> tMyHttpResponse) {
                    if(tMyHttpResponse.getCode() == 200) {
                        return createData(tMyHttpResponse.getData());
                    } else {
                        return Flowable.error(new ApiException("服务器返回error"));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<GoldHttpResponse<T>, T> handleGoldResult() {   //compose判断结果
    return new FlowableTransformer<GoldHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<GoldHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<GoldHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(GoldHttpResponse<T> tGoldHttpResponse) {
                    if(tGoldHttpResponse.getResults() != null) {
                        return createData(tGoldHttpResponse.getResults());
                    } else {
                        return Flowable.error(new ApiException("服务器返回error"));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<GankHttpResponse<T>, T> handleResult() {   //compose判断结果
    return new FlowableTransformer<GankHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<GankHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<GankHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(GankHttpResponse<T> tGankHttpResponse) {
                    if(!tGankHttpResponse.getError()) {
                        return createData(tGankHttpResponse.getResults());
                    } else {
                        return Flowable.error(new ApiException("服务器返回error"));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<WXHttpResponse<T>, T> handleWXResult() {   //compose判断结果
    return new FlowableTransformer<WXHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<WXHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<WXHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(WXHttpResponse<T> tWXHttpResponse) {
                    if(tWXHttpResponse.getCode() == 200) {
                        return createData(tWXHttpResponse.getNewslist());
                    } else {
                        return Flowable.error(new ApiException(tWXHttpResponse.getMsg(), tWXHttpResponse.getCode()));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<MyHttpResponse<T>, T> handleMyResult() {   //compose判断结果
    return new FlowableTransformer<MyHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<MyHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<MyHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(MyHttpResponse<T> tMyHttpResponse) {
                    if(tMyHttpResponse.getCode() == 200) {
                        return createData(tMyHttpResponse.getData());
                    } else {
                        return Flowable.error(new ApiException(tMyHttpResponse.getMessage(), tMyHttpResponse.getCode()));
                    }
                }
            });
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一返回结果处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<GoldHttpResponse<T>, T> handleGoldResult() {   //compose判断结果
    return new FlowableTransformer<GoldHttpResponse<T>, T>() {
        @Override
        public Flowable<T> apply(Flowable<GoldHttpResponse<T>> httpResponseFlowable) {
            return httpResponseFlowable.flatMap(new Function<GoldHttpResponse<T>, Flowable<T>>() {
                @Override
                public Flowable<T> apply(GoldHttpResponse<T> tGoldHttpResponse) {
                    if(tGoldHttpResponse.getResults() != null) {
                        return createData(tGoldHttpResponse.getResults());
                    } else {
                        return Flowable.error(new ApiException("服务器返回error"));
                    }
                }
            });
        }
    };
}
项目:RxLog    文件:RxLog.java   
/**
 * Creates transform operator, which logs defined events in observable's lifecycle
 * @param msg     message
 * @param bitMask bitmask of events which you want to log
 * @param <T>     type
 * @return transformer
 */
public static <T> FlowableTransformer<T, T> logFlowable(final String msg, final int bitMask) {
    return upstream -> {
        if ((bitMask & LOG_SUBSCRIBE) > 0) {
            upstream = upstream.compose(fLogSubscribe(msg));
        }
        if ((bitMask & LOG_TERMINATE) > 0) {
            upstream = upstream.compose(fLogTerminate(msg));
        }
        if ((bitMask & LOG_ERROR) > 0) {
            upstream = upstream.compose(fLogError(msg));
        }
        if ((bitMask & LOG_COMPLETE) > 0) {
            upstream = upstream.compose(fLogComplete(msg));
        }
        if ((bitMask & LOG_NEXT_DATA) > 0) {
            upstream = upstream.compose(fLogNext(msg));
        } else if ((bitMask & LOG_NEXT_EVENT) > 0) {
            upstream = upstream.compose(fLogNextEvent(msg));
        }
        return upstream;
    };
}
项目:Renrentou    文件:XApi.java   
/**
 * 异常处理变换
 *
 * @return
 */
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {

    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.flatMap(new Function<T, Publisher<T>>() {
                @Override
                public Publisher<T> apply(T model) throws Exception {

                    if (model == null || model.isNull()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
                    } else if (model.isAuthError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
                    } else if (model.isBizError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
                    } else {
                        return Flowable.just(model);
                    }
                }
            });
        }
    };
}
项目:RxBus2    文件:RxBusBuilder.java   
public <R> Disposable subscribe(Consumer<R> onNext, Consumer<Throwable> onError, Action onCompleted, FlowableTransformer<T, R> transformer)
{
    Flowable flowable = build(false);
    if (transformer != null)
        flowable = flowable.compose(transformer);

    if (onNext == null)
        onNext = data -> {};
    if (onError == null)
        onError = error -> { throw new OnErrorNotImplementedException(error); };
    if (onCompleted == null)
        onCompleted = () -> {};

    Consumer<R> actualOnNext = onNext;
    if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
        actualOnNext = RxBusUtil.wrapQueueConsumer(onNext, mQueuer);

    flowable = applySchedular(flowable);
    Disposable disposable = flowable.subscribe(actualOnNext, onError, onCompleted);
    if (mBoundObject != null)
        RxDisposableManager.addDisposable(mBoundObject, disposable);
    return disposable;
}
项目:RxBus2    文件:RxBusBuilder.java   
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer)
{
    Flowable flowable = build(false);
    if (transformer != null)
        flowable = flowable.compose(transformer);

    Subscriber<R> actualSubscriber = subscriber;
    if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
        actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer);

    flowable = applySchedular(flowable);
    Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber);
    if (mBoundObject != null)
        RxDisposableManager.addDisposable(mBoundObject, disposable);
    return disposable;
}
项目:EazyBaseMVP    文件:RxUtils.java   
public static <T> FlowableTransformer<T, T> applySchedules(final IView view) {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Subscription>() {
                        @Override
                        public void accept(Subscription subscription) throws Exception {
                            view.showLoading();
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnTerminate(new Action() {
                        @Override
                        public void run() throws Exception {
                            view.hideLoading();
                        }
                    });
        }
    };
}
项目:RxGps    文件:StatusExceptionResumeNextTransformer.java   
public static <R extends Result> FlowableTransformer<R, R> forFlowable() {
    return upstream -> upstream.onErrorResumeNext(throwable -> {
        if(throwable instanceof StatusException) {
            StatusException statusException = (StatusException) throwable;

            if(statusException.getStatus().hasResolution()) {
                return Flowable.just((R) statusException.getResult());
            } else {
                return Flowable.error(throwable);
            }

        } else {
            return Flowable.error(throwable);
        }
    });
}
项目:XDroidMvp    文件:XApi.java   
/**
 * 异常处理变换
 *
 * @return
 */
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {

    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.flatMap(new Function<T, Publisher<T>>() {
                @Override
                public Publisher<T> apply(T model) throws Exception {

                    if (model == null || model.isNull()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
                    } else if (model.isAuthError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
                    } else if (model.isBizError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
                    } else {
                        return Flowable.just(model);
                    }
                }
            });
        }
    };
}
项目:rxjava2-extras    文件:Transformers.java   
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
        final Function<? super T, ? extends R> function) {
    return new FlowableTransformer<T, Pair<T, Statistics>>() {

        @Override
        public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
            return source.scan(Pair.create((T) null, Statistics.create()),
                    new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
                        @Override
                        public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
                            return Pair.create(t, pair.b().add(function.apply(t)));
                        }
                    }).skip(1);
        }
    };
}
项目:rxjava2-extras    文件:Transformers.java   
public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest,
        final boolean constrainFirstRequestMin) {
    Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest");
    return new FlowableTransformer<T, T>() {

        @Override
        public Publisher<T> apply(Flowable<T> source) {
            if (minRequest == maxRequest && constrainFirstRequestMin) {
                return source.rebatchRequests(minRequest);
            } else {
                return source
                        .compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest))
                        .compose(Transformers.<T>maxRequest(maxRequest));
            }
        }
    };
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testPassThroughWithCustomCompletion() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    emitter.onComplete_();
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1, 2, 3, 4, 5, 6) //
            .compose(sm) //
            .test() //
            .assertValues(1, 2, 3, 4, 5, 6) //
            .assertComplete();
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testPassThroughEmitterCompletesTwice() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    emitter.onComplete_();
                    emitter.onComplete_();
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1, 2, 3, 4, 5, 6) //
            .compose(sm) //
            .test() //
            .assertValues(1, 2, 3, 4, 5, 6) //
            .assertComplete();
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testPassThroughEmitterOnNextAfterCompletion() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    emitter.onComplete_();
                    emitter.onNext_(8);
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1, 2, 3, 4, 5, 6) //
            .compose(sm) //
            .test() //
            .assertValues(1, 2, 3, 4, 5, 6) //
            .assertComplete();
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testCompletionThrows() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    throw new ThrowingException();
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1) //
            .compose(sm) //
            .test() //
            .assertValues(1) //
            .assertError(ThrowingException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testStateFactoryReturnsNull() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialStateFactory(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return null;
                }
            }) //
            .transition(PASS_THROUGH_TRANSITION) //
            .build();
    Flowable.just(1) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(NullPointerException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testStateFactoryReturnsNullFromErrorStream() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialStateFactory(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return null;
                }
            }) //
            .transition(PASS_THROUGH_TRANSITION) //
            .build();
    Flowable.<Integer> error(new ThrowingException()) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(NullPointerException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testOnNextThrowsWithBurstSource() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(new Transition2<String, Integer, Integer>() {

                @Override
                public String apply(String state, Integer value, Emitter<Integer> emitter) {
                    throw new ThrowingException();
                }
            }) //
            .requestBatchSize(10) //
            .build();
    Burst.items(1, 2, 3).create() //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(ThrowingException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testCancelFromTransition() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(new Transition2<String, Integer, Integer>() {

                @Override
                public String apply(String state, Integer value, Emitter<Integer> emitter) {
                   emitter.cancel_();
                   return state;
                }
            }) //
            .requestBatchSize(10) //
            .build();
    Burst.items(1, 2, 3).create() //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertNotTerminated();
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testOnNextThrowsWithBurstSourceThatTerminatesWithError() {
    List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
    try {
        RxJavaPlugins.setErrorHandler(Consumers.addTo(list));
        FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
                .initialState("") //
                .transition(new Transition2<String, Integer, Integer>() {

                    @Override
                    public String apply(String state, Integer value, Emitter<Integer> emitter) {
                        throw new ThrowingException();
                    }
                }) //
                .requestBatchSize(10) //
                .build();
        Burst.item(1).error(new RuntimeException()) //
                .compose(sm) //
                .test() //
                .assertNoValues() //
                .assertError(ThrowingException.class);
        assertEquals(1, list.size());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void testStateFactoryReturnsNullOnEmptySource() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialStateFactory(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return null;
                }
            }) //
            .transition(PASS_THROUGH_TRANSITION) //
            .build();
    Flowable.<Integer> empty() //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(NullPointerException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void errorActionThrows() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .errored(new Errored<String, Integer>() {
                @Override
                public void accept(String state, Throwable error, Emitter<Integer> emitter) {
                    throw new ThrowingException();
                }
            }) //
            .build();
    Flowable.<Integer> error(new RuntimeException()) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(ThrowingException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void errorActionPassThrough() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .errored(new Errored<String, Integer>() {
                @Override
                public void accept(String state, Throwable error, Emitter<Integer> emitter) {
                    emitter.onError_(error);
                }
            }) //
            .build();
    Flowable.<Integer> error(new ThrowingException()) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(ThrowingException.class);
}
项目:rxjava2-extras    文件:FlowableStateMachineTest.java   
@Test
public void noActionTransition() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(new Transition2<String, Integer, Integer>() {
                @Override
                public String apply(String state, Integer value, Emitter<Integer> emitter) {
                    return state;
                }
            }) //
            .build();
    Flowable.just(1, 2) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertComplete();
}
项目:nakadi-java    文件:StreamConnectionRestart.java   
/**
 * Allow a {@link Flowable} to be restarted (repeated) via {@link Flowable#repeatWhen} using
 * a fixed delay for a configurable number of restarts. After the delay, the original observer
 * will be repeated unless the stopRepeatingPredicate function, which is called via {@link
 * Flowable#takeUntil}, short circuits the attempt.
 *
 * @param stopRestartingPredicate decide if the repeat should be applied or the observer we're
 * composing with should onComplete
 * @param restartDelay how long to wait between repeats
 * @param restartDelayUnit the time unit for repeats
 * @param maxRestarts the maximum number of repeats
 * @param <T> the type we're composing with ({@link FlowableTransformer}'s upstream and downstream
 * are the same type)
 * @return an {@link FlowableTransformer} that can be given to  {@link Flowable#compose}
 */
<T> FlowableTransformer<T, T> repeatWhenWithDelayAndUntil(
    Predicate<Long> stopRestartingPredicate,
    long restartDelay,
    TimeUnit restartDelayUnit,
    int maxRestarts
) {
  return upstream -> upstream.repeatWhen(
      flowable -> flowable.zipWith(
          Flowable.range(1, maxRestarts),
          (obj, count) -> count
      ).flatMap(
          attemptCount -> {
            if(logger.isDebugEnabled()) {
              logger.debug("stream_repeater_delay delay={} {}, restarts={} max_restarts={}",
                  restartDelay, restartDelayUnit.toString().toLowerCase(), attemptCount,
                  maxRestarts);
            }
            return Flowable.timer(restartDelay, restartDelayUnit);
          }
      ).takeUntil(
          stopRestartingPredicate
      ));
}
项目:truetime-android    文件:TrueTimeRx.java   
private FlowableTransformer<String, InetAddress> resolveNtpPoolToIpAddresses() {
    return new FlowableTransformer<String, InetAddress>() {
        @Override
        public Publisher<InetAddress> apply(Flowable<String> ntpPoolFlowable) {
            return ntpPoolFlowable
                  .observeOn(Schedulers.io())
                  .flatMap(new Function<String, Flowable<InetAddress>>() {
                      @Override
                      public Flowable<InetAddress> apply(String ntpPoolAddress) {
                          try {
                              TrueLog.d(TAG, "---- resolving ntpHost : " + ntpPoolAddress);
                              return Flowable.fromArray(InetAddress.getAllByName(ntpPoolAddress));
                          } catch (UnknownHostException e) {
                              return Flowable.error(e);
                          }
                      }
                  });
        }
    };
}
项目:4Fun    文件:RxUtil.java   
/**
 * 登录注册状态校验
 *
 * @param code
 * @return
 */
public static FlowableTransformer<List<LoginInfo>, LoginInfo> rxStateCheck(final int code) {
    return new FlowableTransformer<List<LoginInfo>, LoginInfo>() {
        @Override
        public Publisher<LoginInfo> apply(Flowable<List<LoginInfo>> upstream) {
            return upstream
                    .map(new Function<List<LoginInfo>, LoginInfo>() {
                        @Override
                        public LoginInfo apply(List<LoginInfo> loginInfos) throws Exception {
                            int serverCode = loginInfos.get(0).getCode();
                            if (serverCode != code) {
                                throw new LoginException(serverCode);
                            }

                            return loginInfos.get(0);
                        }
                    });
        }
    };
}
项目:RxCupboard    文件:RxDatabase.java   
private <T> FlowableTransformer<T, T> autoClose(final QueryResultIterable<T> iterable) {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    // Stream terminates (completed or on error): close the cursor
                    iterable.close();
                }
            }).doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    // Cancelled subscription (manual unsubscribe or via some operator such as take()): close the cursor
                    iterable.close();
                }
            });
        }
    };
}
项目:Learning-RxJava    文件:Ch9_3.java   
public static <T> FlowableTransformer<T, ImmutableList<T>>
toImmutableList() {
    return upstream ->
            upstream.collect(ImmutableList::<T>builder,
                    ImmutableList.Builder::add)
                    .map(ImmutableList.Builder::build)
                    .toFlowable(); // must turn Single into Flowable
}
项目:GitHub    文件:RxSchedulers.java   
public <T> FlowableTransformer<T, T> applyFlowableMainThread() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:RxSchedulers.java   
public <T> FlowableTransformer<T, T> applyFlowableAsysnc() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:RxSchedulers.java   
public <T> FlowableTransformer<T, T> applyFlowableCompute() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:RxUtil.java   
/**
 * 统一线程处理
 * @param <T>
 * @return
 */
public static <T> FlowableTransformer<T, T> rxSchedulerHelper() {    //compose简化线程
    return new FlowableTransformer<T, T>() {
        @Override
        public Flowable<T> apply(Flowable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:Utils.java   
public static <U> FlowableTransformer<U, U> retry2(final String hint, final int retryCount) {
    return new FlowableTransformer<U, U>() {
        @Override
        public Publisher<U> apply(Flowable<U> upstream) {
            return upstream.retry(new BiPredicate<Integer, Throwable>() {
                @Override
                public boolean test(Integer integer, Throwable throwable) throws Exception {
                    return retry(hint, retryCount, integer, throwable);
                }
            });
        }
    };
}