Java 类io.reactivex.FlowableTransformer 实例源码
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<GankHttpResponse<T>, T> handleResult() { //compose判断结果
return new FlowableTransformer<GankHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<GankHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<GankHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(GankHttpResponse<T> tGankHttpResponse) {
if(!tGankHttpResponse.getError()) {
return createData(tGankHttpResponse.getResults());
} else {
return Flowable.error(new ApiException("服务器返回error"));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<WXHttpResponse<T>, T> handleWXResult() { //compose判断结果
return new FlowableTransformer<WXHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<WXHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<WXHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(WXHttpResponse<T> tWXHttpResponse) {
if(tWXHttpResponse.getCode() == 200) {
return createData(tWXHttpResponse.getNewslist());
} else {
return Flowable.error(new ApiException("服务器返回error"));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<MyHttpResponse<T>, T> handleMyResult() { //compose判断结果
return new FlowableTransformer<MyHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<MyHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<MyHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(MyHttpResponse<T> tMyHttpResponse) {
if(tMyHttpResponse.getCode() == 200) {
return createData(tMyHttpResponse.getData());
} else {
return Flowable.error(new ApiException("服务器返回error"));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<GoldHttpResponse<T>, T> handleGoldResult() { //compose判断结果
return new FlowableTransformer<GoldHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<GoldHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<GoldHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(GoldHttpResponse<T> tGoldHttpResponse) {
if(tGoldHttpResponse.getResults() != null) {
return createData(tGoldHttpResponse.getResults());
} else {
return Flowable.error(new ApiException("服务器返回error"));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<GankHttpResponse<T>, T> handleResult() { //compose判断结果
return new FlowableTransformer<GankHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<GankHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<GankHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(GankHttpResponse<T> tGankHttpResponse) {
if(!tGankHttpResponse.getError()) {
return createData(tGankHttpResponse.getResults());
} else {
return Flowable.error(new ApiException("服务器返回error"));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<WXHttpResponse<T>, T> handleWXResult() { //compose判断结果
return new FlowableTransformer<WXHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<WXHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<WXHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(WXHttpResponse<T> tWXHttpResponse) {
if(tWXHttpResponse.getCode() == 200) {
return createData(tWXHttpResponse.getNewslist());
} else {
return Flowable.error(new ApiException(tWXHttpResponse.getMsg(), tWXHttpResponse.getCode()));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<MyHttpResponse<T>, T> handleMyResult() { //compose判断结果
return new FlowableTransformer<MyHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<MyHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<MyHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(MyHttpResponse<T> tMyHttpResponse) {
if(tMyHttpResponse.getCode() == 200) {
return createData(tMyHttpResponse.getData());
} else {
return Flowable.error(new ApiException(tMyHttpResponse.getMessage(), tMyHttpResponse.getCode()));
}
}
});
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一返回结果处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<GoldHttpResponse<T>, T> handleGoldResult() { //compose判断结果
return new FlowableTransformer<GoldHttpResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<GoldHttpResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<GoldHttpResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(GoldHttpResponse<T> tGoldHttpResponse) {
if(tGoldHttpResponse.getResults() != null) {
return createData(tGoldHttpResponse.getResults());
} else {
return Flowable.error(new ApiException("服务器返回error"));
}
}
});
}
};
}
项目:RxLog
文件:RxLog.java
/**
* Creates transform operator, which logs defined events in observable's lifecycle
* @param msg message
* @param bitMask bitmask of events which you want to log
* @param <T> type
* @return transformer
*/
public static <T> FlowableTransformer<T, T> logFlowable(final String msg, final int bitMask) {
return upstream -> {
if ((bitMask & LOG_SUBSCRIBE) > 0) {
upstream = upstream.compose(fLogSubscribe(msg));
}
if ((bitMask & LOG_TERMINATE) > 0) {
upstream = upstream.compose(fLogTerminate(msg));
}
if ((bitMask & LOG_ERROR) > 0) {
upstream = upstream.compose(fLogError(msg));
}
if ((bitMask & LOG_COMPLETE) > 0) {
upstream = upstream.compose(fLogComplete(msg));
}
if ((bitMask & LOG_NEXT_DATA) > 0) {
upstream = upstream.compose(fLogNext(msg));
} else if ((bitMask & LOG_NEXT_EVENT) > 0) {
upstream = upstream.compose(fLogNextEvent(msg));
}
return upstream;
};
}
项目:Renrentou
文件:XApi.java
/**
* 异常处理变换
*
* @return
*/
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.flatMap(new Function<T, Publisher<T>>() {
@Override
public Publisher<T> apply(T model) throws Exception {
if (model == null || model.isNull()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
} else if (model.isAuthError()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
} else if (model.isBizError()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
} else {
return Flowable.just(model);
}
}
});
}
};
}
项目:RxBus2
文件:RxBusBuilder.java
public <R> Disposable subscribe(Consumer<R> onNext, Consumer<Throwable> onError, Action onCompleted, FlowableTransformer<T, R> transformer)
{
Flowable flowable = build(false);
if (transformer != null)
flowable = flowable.compose(transformer);
if (onNext == null)
onNext = data -> {};
if (onError == null)
onError = error -> { throw new OnErrorNotImplementedException(error); };
if (onCompleted == null)
onCompleted = () -> {};
Consumer<R> actualOnNext = onNext;
if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
actualOnNext = RxBusUtil.wrapQueueConsumer(onNext, mQueuer);
flowable = applySchedular(flowable);
Disposable disposable = flowable.subscribe(actualOnNext, onError, onCompleted);
if (mBoundObject != null)
RxDisposableManager.addDisposable(mBoundObject, disposable);
return disposable;
}
项目:RxBus2
文件:RxBusBuilder.java
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer)
{
Flowable flowable = build(false);
if (transformer != null)
flowable = flowable.compose(transformer);
Subscriber<R> actualSubscriber = subscriber;
if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer);
flowable = applySchedular(flowable);
Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber);
if (mBoundObject != null)
RxDisposableManager.addDisposable(mBoundObject, disposable);
return disposable;
}
项目:EazyBaseMVP
文件:RxUtils.java
public static <T> FlowableTransformer<T, T> applySchedules(final IView view) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
view.showLoading();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
view.hideLoading();
}
});
}
};
}
项目:RxGps
文件:StatusExceptionResumeNextTransformer.java
public static <R extends Result> FlowableTransformer<R, R> forFlowable() {
return upstream -> upstream.onErrorResumeNext(throwable -> {
if(throwable instanceof StatusException) {
StatusException statusException = (StatusException) throwable;
if(statusException.getStatus().hasResolution()) {
return Flowable.just((R) statusException.getResult());
} else {
return Flowable.error(throwable);
}
} else {
return Flowable.error(throwable);
}
});
}
项目:XDroidMvp
文件:XApi.java
/**
* 异常处理变换
*
* @return
*/
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.flatMap(new Function<T, Publisher<T>>() {
@Override
public Publisher<T> apply(T model) throws Exception {
if (model == null || model.isNull()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
} else if (model.isAuthError()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
} else if (model.isBizError()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
} else {
return Flowable.just(model);
}
}
});
}
};
}
项目:rxjava2-extras
文件:Transformers.java
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
final Function<? super T, ? extends R> function) {
return new FlowableTransformer<T, Pair<T, Statistics>>() {
@Override
public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
return Pair.create(t, pair.b().add(function.apply(t)));
}
}).skip(1);
}
};
}
项目:rxjava2-extras
文件:Transformers.java
public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest,
final boolean constrainFirstRequestMin) {
Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest");
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> source) {
if (minRequest == maxRequest && constrainFirstRequestMin) {
return source.rebatchRequests(minRequest);
} else {
return source
.compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest))
.compose(Transformers.<T>maxRequest(maxRequest));
}
}
};
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testPassThroughWithCustomCompletion() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
emitter.onComplete_();
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1, 2, 3, 4, 5, 6) //
.compose(sm) //
.test() //
.assertValues(1, 2, 3, 4, 5, 6) //
.assertComplete();
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testPassThroughEmitterCompletesTwice() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
emitter.onComplete_();
emitter.onComplete_();
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1, 2, 3, 4, 5, 6) //
.compose(sm) //
.test() //
.assertValues(1, 2, 3, 4, 5, 6) //
.assertComplete();
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testPassThroughEmitterOnNextAfterCompletion() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
emitter.onComplete_();
emitter.onNext_(8);
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1, 2, 3, 4, 5, 6) //
.compose(sm) //
.test() //
.assertValues(1, 2, 3, 4, 5, 6) //
.assertComplete();
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testCompletionThrows() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1) //
.compose(sm) //
.test() //
.assertValues(1) //
.assertError(ThrowingException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testStateFactoryReturnsNull() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialStateFactory(new Callable<String>() {
@Override
public String call() throws Exception {
return null;
}
}) //
.transition(PASS_THROUGH_TRANSITION) //
.build();
Flowable.just(1) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(NullPointerException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testStateFactoryReturnsNullFromErrorStream() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialStateFactory(new Callable<String>() {
@Override
public String call() throws Exception {
return null;
}
}) //
.transition(PASS_THROUGH_TRANSITION) //
.build();
Flowable.<Integer> error(new ThrowingException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(NullPointerException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testOnNextThrowsWithBurstSource() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.requestBatchSize(10) //
.build();
Burst.items(1, 2, 3).create() //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testCancelFromTransition() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
emitter.cancel_();
return state;
}
}) //
.requestBatchSize(10) //
.build();
Burst.items(1, 2, 3).create() //
.compose(sm) //
.test() //
.assertNoValues() //
.assertNotTerminated();
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testOnNextThrowsWithBurstSourceThatTerminatesWithError() {
List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
try {
RxJavaPlugins.setErrorHandler(Consumers.addTo(list));
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.requestBatchSize(10) //
.build();
Burst.item(1).error(new RuntimeException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
assertEquals(1, list.size());
} finally {
RxJavaPlugins.reset();
}
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void testStateFactoryReturnsNullOnEmptySource() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialStateFactory(new Callable<String>() {
@Override
public String call() throws Exception {
return null;
}
}) //
.transition(PASS_THROUGH_TRANSITION) //
.build();
Flowable.<Integer> empty() //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(NullPointerException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void errorActionThrows() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.errored(new Errored<String, Integer>() {
@Override
public void accept(String state, Throwable error, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.build();
Flowable.<Integer> error(new RuntimeException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void errorActionPassThrough() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.errored(new Errored<String, Integer>() {
@Override
public void accept(String state, Throwable error, Emitter<Integer> emitter) {
emitter.onError_(error);
}
}) //
.build();
Flowable.<Integer> error(new ThrowingException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
}
项目:rxjava2-extras
文件:FlowableStateMachineTest.java
@Test
public void noActionTransition() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
return state;
}
}) //
.build();
Flowable.just(1, 2) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertComplete();
}
项目:nakadi-java
文件:StreamConnectionRestart.java
/**
* Allow a {@link Flowable} to be restarted (repeated) via {@link Flowable#repeatWhen} using
* a fixed delay for a configurable number of restarts. After the delay, the original observer
* will be repeated unless the stopRepeatingPredicate function, which is called via {@link
* Flowable#takeUntil}, short circuits the attempt.
*
* @param stopRestartingPredicate decide if the repeat should be applied or the observer we're
* composing with should onComplete
* @param restartDelay how long to wait between repeats
* @param restartDelayUnit the time unit for repeats
* @param maxRestarts the maximum number of repeats
* @param <T> the type we're composing with ({@link FlowableTransformer}'s upstream and downstream
* are the same type)
* @return an {@link FlowableTransformer} that can be given to {@link Flowable#compose}
*/
<T> FlowableTransformer<T, T> repeatWhenWithDelayAndUntil(
Predicate<Long> stopRestartingPredicate,
long restartDelay,
TimeUnit restartDelayUnit,
int maxRestarts
) {
return upstream -> upstream.repeatWhen(
flowable -> flowable.zipWith(
Flowable.range(1, maxRestarts),
(obj, count) -> count
).flatMap(
attemptCount -> {
if(logger.isDebugEnabled()) {
logger.debug("stream_repeater_delay delay={} {}, restarts={} max_restarts={}",
restartDelay, restartDelayUnit.toString().toLowerCase(), attemptCount,
maxRestarts);
}
return Flowable.timer(restartDelay, restartDelayUnit);
}
).takeUntil(
stopRestartingPredicate
));
}
项目:truetime-android
文件:TrueTimeRx.java
private FlowableTransformer<String, InetAddress> resolveNtpPoolToIpAddresses() {
return new FlowableTransformer<String, InetAddress>() {
@Override
public Publisher<InetAddress> apply(Flowable<String> ntpPoolFlowable) {
return ntpPoolFlowable
.observeOn(Schedulers.io())
.flatMap(new Function<String, Flowable<InetAddress>>() {
@Override
public Flowable<InetAddress> apply(String ntpPoolAddress) {
try {
TrueLog.d(TAG, "---- resolving ntpHost : " + ntpPoolAddress);
return Flowable.fromArray(InetAddress.getAllByName(ntpPoolAddress));
} catch (UnknownHostException e) {
return Flowable.error(e);
}
}
});
}
};
}
项目:4Fun
文件:RxUtil.java
/**
* 登录注册状态校验
*
* @param code
* @return
*/
public static FlowableTransformer<List<LoginInfo>, LoginInfo> rxStateCheck(final int code) {
return new FlowableTransformer<List<LoginInfo>, LoginInfo>() {
@Override
public Publisher<LoginInfo> apply(Flowable<List<LoginInfo>> upstream) {
return upstream
.map(new Function<List<LoginInfo>, LoginInfo>() {
@Override
public LoginInfo apply(List<LoginInfo> loginInfos) throws Exception {
int serverCode = loginInfos.get(0).getCode();
if (serverCode != code) {
throw new LoginException(serverCode);
}
return loginInfos.get(0);
}
});
}
};
}
项目:RxCupboard
文件:RxDatabase.java
private <T> FlowableTransformer<T, T> autoClose(final QueryResultIterable<T> iterable) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
// Stream terminates (completed or on error): close the cursor
iterable.close();
}
}).doOnCancel(new Action() {
@Override
public void run() throws Exception {
// Cancelled subscription (manual unsubscribe or via some operator such as take()): close the cursor
iterable.close();
}
});
}
};
}
项目:Learning-RxJava
文件:Ch9_3.java
public static <T> FlowableTransformer<T, ImmutableList<T>>
toImmutableList() {
return upstream ->
upstream.collect(ImmutableList::<T>builder,
ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.toFlowable(); // must turn Single into Flowable
}
项目:GitHub
文件:RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableMainThread() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableAsysnc() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableCompute() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:RxUtil.java
/**
* 统一线程处理
* @param <T>
* @return
*/
public static <T> FlowableTransformer<T, T> rxSchedulerHelper() { //compose简化线程
return new FlowableTransformer<T, T>() {
@Override
public Flowable<T> apply(Flowable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:Utils.java
public static <U> FlowableTransformer<U, U> retry2(final String hint, final int retryCount) {
return new FlowableTransformer<U, U>() {
@Override
public Publisher<U> apply(Flowable<U> upstream) {
return upstream.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
return retry(hint, retryCount, integer, throwable);
}
});
}
};
}