/** * In this experiment, we will use RxJava to pick a lock. Our lock has three * tumblers. We will need them all to be up to unlock the lock! */ @Test public void combineLatestShouldTakeTheLastEventsOfASetOfObservablesAndCombinesThem() { Observable<Boolean> tumbler1Observable = Observable.just(20).map(integer -> new Random().nextInt(integer) > 15) .delay(new Random().nextInt(20), TimeUnit.MILLISECONDS).repeat(1000); Observable<Boolean> tumbler2Observable = Observable.just(20).map(integer -> new Random().nextInt(integer) > 15) .delay(new Random().nextInt(20), TimeUnit.MILLISECONDS).repeat(1000); Observable<Boolean> tumbler3Observable = Observable.just(20).map(integer -> new Random().nextInt(integer) > 15) .delay(new Random().nextInt(20), TimeUnit.MILLISECONDS).repeat(1000); Function3<Boolean, Boolean, Boolean, Boolean> combineTumblerStatesFunction = (tumblerOneUp, tumblerTwoUp, tumblerThreeUp) -> { Boolean allTumblersUnlocked = tumblerOneUp && tumblerTwoUp && tumblerThreeUp; return allTumblersUnlocked; }; Observable<Boolean> lockIsPickedObservable = Observable .combineLatest(tumbler1Observable, tumbler2Observable, tumbler3Observable, combineTumblerStatesFunction) .takeUntil(unlocked -> unlocked == true); lockIsPickedObservable.subscribe(testObservable); testObservable.awaitTerminalEvent(); List<Object> onNextEvents = testObservable.values(); assertThat(onNextEvents.get(onNextEvents.size()-1)).isEqualTo(null); }
public FlowableStateMachine(Flowable<In> source, // Callable<? extends State> initialState, // Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, // BiConsumer<? super State, ? super Emitter<Out>> completionAction, // Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, // BackpressureStrategy backpressureStrategy, // int requestBatchSize) { Preconditions.checkNotNull(initialState); Preconditions.checkNotNull(transition); Preconditions.checkNotNull(backpressureStrategy); Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero"); this.source = source; this.initialState = initialState; this.transition = transition; this.completionAction = completionAction; this.errorAction = errorAction; this.backpressureStrategy = backpressureStrategy; this.requestBatchSize = requestBatchSize; }
StateMachineSubscriber( // Callable<? extends State> initialState, Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, // BiConsumer<? super State, ? super Emitter<Out>> completionAction, // Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, // BackpressureStrategy backpressureStrategy, // int requestBatchSize, // Subscriber<? super Out> child) { this.initialState = initialState; this.transition = transition; this.completionAction = completionAction; this.errorAction = errorAction; this.backpressureStrategy = backpressureStrategy; this.requestBatchSize = requestBatchSize; this.child = child; this.count = requestBatchSize; }
public static <T> Single<DetailBean<T>> toCommentDetail(Single<T> detailSingle, Single<List<CommentBean>> bestCommentsSingle, Single<List<CommentBean>> commentsSingle){ return Single.zip(detailSingle, bestCommentsSingle, commentsSingle, new Function3<T, List<CommentBean>, List<CommentBean>, DetailBean<T>>() { @Override public DetailBean<T> apply(T t, List<CommentBean> commentBeen, List<CommentBean> commentBeen2) throws Exception { return new DetailBean<T>(t,commentBeen,commentBeen2); } }); }
private TransformerStateMachine(Callable<? extends State> initialState, Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, BackpressureStrategy backpressureStrategy, int requestBatchSize) { Preconditions.checkNotNull(initialState); Preconditions.checkNotNull(transition); Preconditions.checkNotNull(completion); Preconditions.checkNotNull(backpressureStrategy); Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero"); this.initialState = initialState; this.transition = transition; this.completion = completion; this.backpressureStrategy = backpressureStrategy; this.requestBatchSize = requestBatchSize; }
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState, Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, BackpressureStrategy backpressureStrategy, int requestBatchSize) { return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy, requestBatchSize); }
private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute( final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state, final BackpressureStrategy backpressureStrategy) { return new Function<Notification<In>, Flowable<Notification<Out>>>() { @Override public Flowable<Notification<Out>> apply(final Notification<In> in) { return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() { @Override public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception { FlowableEmitter<Out> w = wrap(emitter); if (in.isOnNext()) { state.value = transition.apply(state.value, in.getValue(), w); if (!emitter.isCancelled()) emitter.onComplete(); else { // this is a special emission to indicate that // the transition called unsubscribe. It will be // filtered later. emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification()); } } else if (in.isOnComplete()) { if (completion.test(state.value, w) && !emitter.isCancelled()) { w.onComplete(); } } else if (!emitter.isCancelled()) { w.onError(in.getError()); } } }, backpressureStrategy); } }; }
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState, Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition, BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, BackpressureStrategy backpressureStrategy, int requestBatchSize) { return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy, requestBatchSize); }
public LambdaChannelResponder( ChannelSubscriber<Request, Response> actual, State initialState, Function3<State, Request, ChannelTerminalEvents, ? extends Publisher<Response>> queryMapper, Consumer<? super State> stateConsumer) { this.state = initialState; this.actual = actual; this.queryMapper = queryMapper; this.stateConsumer = stateConsumer; this.cancelled = PublishSubject.create().toSerialized(); }
public void onZipRequest(View view) { //使用zip操作符合并等待多个网络请求完成后,再刷新界面 //例如下面:数据来自3个不同的接口 Observable<ResultBean> mobileObservable = EasyHttp.get("http://apis.juhe.cn/mobile/get") .params("phone", "18688994275") .params("dtype", "json") .params("key", "5682c1f44a7f486e40f9720d6c97ffe4") .execute(new CallClazzProxy<TestApiResult1<ResultBean>, ResultBean>(ResultBean.class) { }); Observable<Content> searchObservable = EasyHttp.get("/ajax.php") .baseUrl("http://fy.iciba.com") .params("a", "fy") .params("f", "auto") .params("t", "auto") .params("w", "hello world") //采用代理 .execute(new CallClazzProxy<TestApiResult6<Content>, Content>(Content.class) { }); Observable<List<SectionItem>> listObservable = EasyHttp.get("http://news-at.zhihu.com/api/3/sections") .execute(new CallClazzProxy<TestApiResult5<List<SectionItem>>, List<SectionItem>>(new TypeToken<List<SectionItem>>() { }.getType()) { }); //new Function3最后一个参数这里用的是List<Object>,表示将3个返回的结果,放在同一个集合最终一次性返回,你也可以指定返回其它你需要的数据类型并不一定是List<Object> //假如这三个接口返回的都是TestBean,那么就可以直接用具体的List<TestBean>,不需要用List<Object> Observable.zip(mobileObservable, searchObservable, listObservable, new Function3<ResultBean, Content, List<SectionItem>, List<Object>>() { @Override public List<Object> apply(@NonNull ResultBean resultbean, @NonNull Content content, @NonNull List<SectionItem> sectionItems) throws Exception { //将接收到的3个数据先暂存起来,一次性发给订阅者 List list = new ArrayList(); list.add(resultbean); list.add(content); list.add(sectionItems); return list; } }).subscribe(new BaseSubscriber<List<Object>>() { @Override public void onError(ApiException e) { showToast(e.getMessage()); } @Override public void onNext(@NonNull List<Object> objects) { showToast(objects.toString()); } }); }
/** * Matches when all observable sequences have an available * element and projects the elements by invoking the selector function. * * @param <R> the result type * @param selector * the function that will be invoked for elements in the source sequences. * @return the plan for the matching * @throws NullPointerException * if selector is null */ public <R> Plan<R> then(Function3<T1, T2, T3, R> selector) { if (selector == null) { throw new NullPointerException(); } return new Plan3<T1, T2, T3, R>(this, selector); }
/** * Transforms the list using the supplied map. The map will receive a Flowable * bound to the previous and next items in the list. The previous and next Flowables will emit * when the item moves within the list or when items surrounding the list are moved. * @param transform A function transforming the source to the target type * @param <R> The type of the mapped value * @return A new FlowableList which has values mapped via the supplied map */ public <R> FlowableList<R> indexedMap(final Function3<T, Flowable<Optional<T>>, Flowable<Optional<T>>, R> transform) { return new IndexedFlowableList<>(this, transform); }