@Test public void toStreamCancel() { UnicastProcessor<Integer> up = UnicastProcessor.create(); up.onNext(1); up.onNext(2); up.onNext(3); up.onNext(4); up.onNext(5); try (Stream<Integer> s = up .to(FlowableInterop.toStream()).limit(3)) { Assert.assertTrue(up.hasSubscribers()); List<Integer> list = s.collect(Collectors.toList()); Assert.assertEquals(Arrays.asList(1, 2, 3), list); } Assert.assertFalse(up.hasSubscribers()); }
@Test public void mapOptionalAsyncFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); UnicastProcessor<Integer> up = UnicastProcessor.create(); TestHelper.emit(up, 1, 2, 3, 4, 5); up .compose(FlowableInterop.mapOptional(v -> { if (v % 2 == 0) { return Optional.of(-v); } return Optional.empty(); })) .subscribeWith(ts) .assertOf(TestHelper.assertFusedSubscriber(QueueSubscription.ASYNC)) .assertResult(-2, -4); }
@Test public void mapOptionalAsyncFusedConditional() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); UnicastProcessor<Integer> up = UnicastProcessor.create(); TestHelper.emit(up, 1, 2, 3, 4, 5); up .compose(FlowableInterop.mapOptional(v -> { if (v % 2 == 0) { return Optional.of(-v); } return Optional.empty(); })) .filter(Functions.alwaysTrue()) .subscribeWith(ts) .assertOf(TestHelper.assertFusedSubscriber(QueueSubscription.ASYNC)) .assertResult(-2, -4); }
WindowPredicateSubscriber(Subscriber<? super Flowable<T>> actual, Predicate<? super T> predicate, Mode mode, int bufferSize) { super(1); this.actual = actual; this.predicate = predicate; this.mode = mode; this.bufferSize = bufferSize; // In Mode.BEFORE windows are opened earlier and added to the 1-element drain "queue" if (mode == Mode.BEFORE) { requestedWindows = new AtomicLong(); pending = new AtomicReference<UnicastProcessor<T>>(); } else { requestedWindows = null; pending = null; } }
@Test public void runAsyncProcessor() { AsyncFlowable.runAsync(Schedulers.single(), UnicastProcessor.<Object>create(), new BiConsumer<Subscriber<Object>, Disposable>() { @Override public void accept(Subscriber<? super Object> s, Disposable d) throws Exception { s.onNext(1); s.onNext(2); s.onNext(3); Thread.sleep(200); s.onNext(4); s.onNext(5); s.onComplete(); } }) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(1, 2, 3, 4, 5); }
private void drain() { if (requestedWindows.get() <= 0) { return; } UnicastProcessor<T> w = pending.getAndSet(null); if (w == null) { return; } getAndIncrement(); requestedWindows.getAndDecrement(); actual.onNext(w); }
@Override public boolean tryOnNext(T t) { UnicastProcessor<T> w = window; if (w == null) { // ignore additional items after last window is completed if (cancelled.get()) { return true; } // emit next window w = UnicastProcessor.<T>create(bufferSize, this); window = w; getAndIncrement(); if (mode == Mode.BEFORE) { requestedWindows.getAndDecrement(); } actual.onNext(w); } boolean b; try { // negate predicate for windowWhile b = predicate.test(t) ^ mode == Mode.BEFORE; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); actual.onError(ex); w.onError(ex); window = null; return true; } if (b) { // element goes into current window if (mode == Mode.AFTER) { w.onNext(t); } // finish current window w.onComplete(); // element goes into the next requested window if (mode == Mode.BEFORE) { w = UnicastProcessor.<T>create(bufferSize, this); window = w; w.onNext(t); // add window to drain queue pending.set(w); // try emitting right away drain(); } else { // new window emitted on next upstream item window = null; } } else { w.onNext(t); } return b; }
@Test public void asyncFused() { UnicastProcessor<Integer> up = UnicastProcessor.create(); MulticastProcessor<Integer> mp = MulticastProcessor.create(4); up.subscribe(mp); TestSubscriber<Integer> ts = mp.test(); for (int i = 0; i < 10; i++) { up.onNext(i); } assertFalse(mp.offer(10)); up.onComplete(); ts.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); }