public Disposable build(final IContact iContact) { if (!PermissionWrapper.hasContactsPermissions(mContext)) { throw new SecurityException("Contact Permission Missing"); } return new CListExtractorAbstract(mContext).getList(mListFilterType, orderBy, limit, skip) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new BiConsumer<List<CList>, Throwable>() { @Override public void accept(List<CList> genericCLists, Throwable throwable) throws Exception { if (iContact == null) return; if (throwable == null) iContact.onContactSuccess(genericCLists); else iContact.onContactError(throwable); } }); }
private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters, Function<? super ResultSet, T> mapper) { Callable<ResultSet> initialState = () -> { Util.convertAndSetParameters(ps.ps, parameters, ps.names); ps.ps.execute(); return ps.ps.getGeneratedKeys(); }; BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> { if (rs.next()) { emitter.onNext(mapper.apply(rs)); } else { emitter.onComplete(); } }; Consumer<ResultSet> disposer = Util::closeSilently; return Flowable.generate(initialState, generator, disposer); }
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt, Function<? super ResultSet, ? extends T> f) throws SQLException { ResultSet rsActual = stmt.stmt.getResultSet(); Callable<ResultSet> initialState = () -> rsActual; BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> { log.debug("getting row from ps={}, rs={}", stmt.stmt, rs); if (rs.next()) { T v = f.apply(rs); log.debug("emitting {}", v); emitter.onNext(v); } else { log.debug("completed"); emitter.onComplete(); } }; Consumer<ResultSet> disposeState = Util::closeSilently; return Flowable.generate(initialState, generator, disposeState); }
@Test public void consumerSignalsErrorCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doError(new IOException()); } })) .test() .assertFailure(IOException.class); assertFalse(pp.hasSubscribers()); }
@Test public void consumerThrowsCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { throw new IOException(); } })) .test() .assertFailure(IOException.class); assertFalse(pp.hasSubscribers()); }
@Test public void consumerCompleteCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .test() .assertResult(); assertFalse(pp.hasSubscribers()); }
@Test public void mapFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(2, 4, 6, 8, 10); }
@Test public void mapAsyncFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); UnicastProcessor<Integer> up = UnicastProcessor.create(); TestHelper.emit(up, 1, 2, 3, 4, 5); up .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.ASYNC)) .assertResult(2, 4, 6, 8, 10); }
@Test public void filterFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { if (t % 2 == 0) { e.doNext(t * 2); } } })) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(4, 8); }
@Test public void consumerThrowsFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { throw new IOException(); } })) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertFailure(IOException.class); }
@Test public void consumerSignalsErrorFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doError(new IOException()); } })) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertFailure(IOException.class); }
@Test public void consumerCompleteFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(); }
@Test public void consumerSignalsErrorCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doError(new IOException()); } })) .filter(Functions.alwaysTrue()) .test() .assertFailure(IOException.class); assertFalse(pp.hasSubscribers()); }
@Test public void consumerThrowsCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { throw new IOException(); } })) .filter(Functions.alwaysTrue()) .test() .assertFailure(IOException.class); assertFalse(pp.hasSubscribers()); }
@Test public void consumerCompleteCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .filter(Functions.alwaysTrue()) .test() .assertResult(); assertFalse(pp.hasSubscribers()); }
@Test public void mapFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(2, 4, 6, 8, 10); }
@Test public void mapAsyncFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); UnicastProcessor<Integer> up = UnicastProcessor.create(); TestHelper.emit(up, 1, 2, 3, 4, 5); up .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.ASYNC)) .assertResult(2, 4, 6, 8, 10); }
@Test public void filterFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { if (t % 2 == 0) { e.doNext(t * 2); } } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(4, 8); }
@Test public void consumerThrowsFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { throw new IOException(); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertFailure(IOException.class); }
@Test public void consumerSignalsErrorFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doError(new IOException()); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertFailure(IOException.class); }
@Test public void consumerCompleteFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(); }
public FlowableStateMachine(Flowable<In> source, // Callable<? extends State> initialState, // Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, // BiConsumer<? super State, ? super Emitter<Out>> completionAction, // Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, // BackpressureStrategy backpressureStrategy, // int requestBatchSize) { Preconditions.checkNotNull(initialState); Preconditions.checkNotNull(transition); Preconditions.checkNotNull(backpressureStrategy); Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero"); this.source = source; this.initialState = initialState; this.transition = transition; this.completionAction = completionAction; this.errorAction = errorAction; this.backpressureStrategy = backpressureStrategy; this.requestBatchSize = requestBatchSize; }
StateMachineSubscriber( // Callable<? extends State> initialState, Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, // BiConsumer<? super State, ? super Emitter<Out>> completionAction, // Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, // BackpressureStrategy backpressureStrategy, // int requestBatchSize, // Subscriber<? super Out> child) { this.initialState = initialState; this.transition = transition; this.completionAction = completionAction; this.errorAction = errorAction; this.backpressureStrategy = backpressureStrategy; this.requestBatchSize = requestBatchSize; this.child = child; this.count = requestBatchSize; }
@Override protected void onCreate(Bundle savedState) { super.onCreate(savedState); restartableFirst(RequestCommand.REQUEST_READ_TYPE, new Function0<Observable<List<ReadTypeBean>>>() { @Override public Observable<List<ReadTypeBean>> apply() { return HttpRetrofit.getInstance().providers.getTypeList(observable, new DynamicKey("闲读分类"), new EvictDynamicKey(false)) .map(new HttpRetrofit.HttpResultFuncCcche<List<ReadTypeBean>>()) .compose(HttpRetrofit.toSubscribe()); } }, new BiConsumer<ReadTadPageFragment, List<ReadTypeBean>>() { @Override public void accept(@NonNull ReadTadPageFragment readTadPageFragment, @NonNull List<ReadTypeBean> readTypeBeen) throws Exception { readTadPageFragment.onData(readTypeBeen); } }); }
@Override protected void onCreate(Bundle savedState) { super.onCreate(savedState); restartableFirst(RequestCommand.REQUEST_READ_CHILD_LIST, new Function0<Observable<ReadTypeBean>>() { @Override public Observable<ReadTypeBean> apply() { return HttpRetrofit.getInstance().providers.getStackTypeList(observable, new DynamicKey(requestContext.getType()), new EvictDynamicKey(true)) .map(new HttpRetrofit.HttpResultFuncCcche<ReadTypeBean>()) .compose(HttpRetrofit.toSubscribe()); } }, new BiConsumer<ReadMoreActivity, ReadTypeBean>() { @Override public void accept(@NonNull ReadMoreActivity readMoreActivity, @NonNull ReadTypeBean readTypeBean) throws Exception { if (readTypeBean.getReadListBeanList() != null) readMoreActivity.onDataList(readTypeBean.getReadListBeanList()); if(!TextUtils.isEmpty(readTypeBean.getPage())) readMoreActivity.setUrl_page(readTypeBean.getPage()); } }); }
public void rx2WithSingleBiConsumer() { io.reactivex.Single.just(1).subscribe(new BiConsumer<Integer, Throwable>() { @Override public void accept(Integer integer, Throwable throwable) throws Exception { } }); }
private BiConsumer<Object, Object> booleanBiConsumer(final boolean[] result) { return new BiConsumer<Object, Object>() { @Override public void accept(Object o, Object o2) throws Exception { result[0] = (boolean) o2; } }; }
private void callAction(Throwable throwable, Integer retry) throws Exception { BiConsumer<Throwable, Integer> action = doOnRetry(throwable, retry); if (action == null) { return; } action.accept(throwable, retry); }
CoalesceSubscriber(Subscriber<? super R> actual, Callable<R> containerSupplier, BiConsumer<R, T> coalescer, int bufferSize) { this.actual = actual; this.containerSupplier = containerSupplier; this.coalescer = coalescer; this.requested = new AtomicLong(); this.bufferSize = bufferSize; }
@Test public void map() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .test() .assertResult(2, 4, 6, 8, 10); }
@Test public void take() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .take(3) .test() .assertResult(2, 4, 6); }
@Test public void filter() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { if (t % 2 == 0) { e.doNext(t * 2); } } })) .test() .assertResult(4, 8); }
@Test public void mapAndComplete() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); e.doComplete(); } })) .test() .assertResult(2); }
@Test public void mapTwice() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); e.doNext(t * 2); } })) .test() .assertFailure(IllegalStateException.class, 2); }
@Test public void mapHidden() { Flowable.range(1, 5).hide() .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .test() .assertResult(2, 4, 6, 8, 10); }
@Test public void filterHidden() { Flowable.range(1, 5).hide() .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { if (t % 2 == 0) { e.doNext(t * 2); } } })) .test() .assertResult(4, 8); }
@Test public void consumerThrows() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { throw new IOException(); } })) .test() .assertFailure(IOException.class); }
@Test public void consumerSignalsError() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doError(new IOException()); } })) .test() .assertFailure(IOException.class); }
@Test public void consumerCompletes() { Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .test() .assertResult(); }
@Test public void error() { Flowable.<Integer>error(new IOException()) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .test() .assertFailure(IOException.class); }