@Test public void refCounted() { MulticastProcessor<Integer> mp = MulticastProcessor.create(true); BooleanSubscription bs = new BooleanSubscription(); mp.onSubscribe(bs); assertFalse(bs.isCancelled()); mp.test().cancel(); assertTrue(bs.isCancelled()); assertFalse(mp.hasSubscribers()); assertTrue(mp.hasComplete()); assertFalse(mp.hasThrowable()); assertNull(mp.getThrowable()); mp.test().assertResult(); }
@Test public void refCounted2() { MulticastProcessor<Integer> mp = MulticastProcessor.create(16, true); BooleanSubscription bs = new BooleanSubscription(); mp.onSubscribe(bs); assertFalse(bs.isCancelled()); mp.test(1, true); assertTrue(bs.isCancelled()); assertFalse(mp.hasSubscribers()); assertTrue(mp.hasComplete()); assertFalse(mp.hasThrowable()); assertNull(mp.getThrowable()); mp.test().assertResult(); }
@Test public void multiStart() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { MulticastProcessor<Integer> mp = MulticastProcessor.create(4, false); mp.start(); mp.start(); mp.startUnbounded(); BooleanSubscription bs = new BooleanSubscription(); mp.onSubscribe(bs); assertTrue(bs.isCancelled()); TestHelper.assertError(errors, 0, ProtocolViolationException.class); TestHelper.assertError(errors, 1, ProtocolViolationException.class); TestHelper.assertError(errors, 2, ProtocolViolationException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void foundWithUnconditionalOnCompleteAfter() { new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onNext(10); s.onComplete(); } } .compose(FlowableTransformers.indexOf(new Predicate<Integer>() { @Override public boolean test(Integer v) throws Exception { return v == 10; } })) .test() .assertResult(0L); }
@Test public void outerDoubleError() { List<Throwable> error = TestHelper.trackPluginErrors(); try { final PublishProcessor<Integer> pp2 = PublishProcessor.create(); new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onError(new IOException()); s.onError(new IllegalArgumentException()); } } .compose(FlowableTransformers.switchFlatMap(Functions.justFunction(pp2), 2)) .test() .assertFailure(IOException.class); TestHelper.assertError(error, 0, IllegalArgumentException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void innerDoubleError() { List<Throwable> error = TestHelper.trackPluginErrors(); try { Flowable.just(1) .compose(FlowableTransformers.switchFlatMap(Functions.justFunction(new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onError(new IOException()); s.onError(new IllegalArgumentException()); } }), 2)) .test() .assertFailure(IOException.class); TestHelper.assertError(error, 0, IllegalArgumentException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void oneAndErrorInner() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { Flowable.just(1) .compose(FlowableTransformers.filterAsync(new Function<Object, Publisher<Boolean>>() { @Override public Publisher<Boolean> apply(Object v) throws Exception { return new Flowable<Boolean>() { @Override public void subscribeActual(Subscriber<? super Boolean> s) { s.onSubscribe(new BooleanSubscription()); s.onNext(true); s.onError(new IOException()); } }; } }, 16)) .test() .assertResult(1); TestHelper.assertUndeliverable(errors, 0, IOException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void badSource() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final PublishProcessor<Integer> pp1 = PublishProcessor.create(); final Flowable<Integer> pp2 = new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { BooleanSubscription bs1 = new BooleanSubscription(); s.onSubscribe(bs1); BooleanSubscription bs2 = new BooleanSubscription(); s.onSubscribe(bs2); Assert.assertFalse(bs1.isCancelled()); Assert.assertTrue(bs2.isCancelled()); } }; Flowables.zipLatest(pp1, pp2, toString2).test(); TestHelper.assertError(errors, 0, ProtocolViolationException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void oneAndErrorInner() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { Flowable.just(1) .compose(FlowableTransformers.mapAsync(new Function<Object, Publisher<Integer>>() { @Override public Publisher<Integer> apply(Object v) throws Exception { return new Flowable<Integer>() { @Override public void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onNext(1); s.onError(new IOException()); } }; } }, 16)) .test() .assertResult(1); TestHelper.assertUndeliverable(errors, 0, IOException.class); } finally { RxJavaPlugins.reset(); } }
void checkNoNext(Function<Nono, Nono> mapper) { try { mapper.apply(new Nono() { @Override protected void subscribeActual(Subscriber<? super Void> s) { s.onSubscribe(new BooleanSubscription()); s.onNext(null); s.onComplete(); } }) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(); } catch (Throwable ex) { throw ExceptionHelper.wrapOrThrow(ex); } }
@Test(timeout = 10000) public void directUntimed() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { ts.onSubscribe(new BooleanSubscription()); scheduler.scheduleDirect(new Runnable() { @Override public void run() { ts.onNext(1); ts.onNext(2); ts.onNext(3); ts.onNext(4); ts.onNext(5); ts.onComplete(); scheduler.shutdown(); } }); ts.assertEmpty(); } }); ts.assertResult(1, 2, 3, 4, 5); for (Throwable t : errors) { t.printStackTrace(); } assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); } }
@Test(timeout = 10000) public void directTimed() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { ts.onSubscribe(new BooleanSubscription()); scheduler.scheduleDirect(new Runnable() { @Override public void run() { ts.onNext(1); ts.onNext(2); ts.onNext(3); ts.onNext(4); ts.onNext(5); ts.onComplete(); scheduler.shutdown(); } }, 100, TimeUnit.MILLISECONDS); ts.assertEmpty(); } }); ts.assertResult(1, 2, 3, 4, 5); assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); } }
@Test public void normal() { FlowableProcessor<Integer> fp = FlowableProcessors.wrap(sp); fp.onSubscribe(new BooleanSubscription()); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); TestSubscriber<Integer> ts = fp.test(); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); fp.onNext(1); fp.onNext(2); ts.assertValues(1, 2); assertFalse(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); fp.onComplete(); assertTrue(fp.hasComplete()); assertFalse(fp.hasThrowable()); assertNull(fp.getThrowable()); ts.assertResult(1, 2); }
@Test public void badSource() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { FlowableConsumers.subscribeAutoDispose( new Flowable<Integer>() { @Override protected void subscribeActual( Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onNext(1); s.onComplete(); s.onSubscribe(new BooleanSubscription()); s.onNext(2); s.onComplete(); s.onError(new IOException()); } }, composite, this, this, this ); assertEquals(Arrays.<Object>asList(1, "OnComplete"), events); TestHelper.assertUndeliverable(errors, 0, IOException.class); } finally { RxJavaPlugins.reset(); } }
@SuppressWarnings("unchecked") @Test public void doubleError() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onError(new IllegalArgumentException()); s.onError(new IOException()); } } .compose(FlowableTransformers.windowWhile(new Predicate<Integer>() { @Override public boolean test(Integer v) throws Exception { return v != -1; } })) .flatMapSingle(toList) .test() .assertFailure(IllegalArgumentException.class); TestHelper.assertUndeliverable(errors, 0, IOException.class); } finally { RxJavaPlugins.reset(); } }
@SuppressWarnings("unchecked") @Test public void doubleError() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onError(new IllegalArgumentException()); s.onError(new IOException()); } } .compose(FlowableTransformers.bufferWhile(new Predicate<Integer>() { @Override public boolean test(Integer v) throws Exception { return v != -1; } })) .test() .assertFailure(IllegalArgumentException.class); TestHelper.assertUndeliverable(errors, 0, IOException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void sj1ToFp2Lifecycle2() { rx.subjects.Subject<Integer, Integer> sj1 = rx.subjects.PublishSubject.create(); io.reactivex.processors.FlowableProcessor<Integer> pp2 = toV2Processor(sj1); io.reactivex.subscribers.TestSubscriber<Integer> to = pp2.test(); assertTrue(pp2.hasSubscribers()); assertTrue(sj1.hasObservers()); assertFalse(pp2.hasComplete()); assertFalse(pp2.hasThrowable()); assertNull(pp2.getThrowable()); BooleanSubscription d1 = new BooleanSubscription(); pp2.onSubscribe(d1); assertFalse(d1.isCancelled()); pp2.onNext(1); pp2.onNext(2); pp2.onComplete(); pp2.onComplete(); pp2.onError(new IOException()); pp2.onNext(3); BooleanSubscription d2 = new BooleanSubscription(); pp2.onSubscribe(d2); assertFalse(d1.isCancelled()); assertTrue(d2.isCancelled()); assertFalse(pp2.hasSubscribers()); assertFalse(sj1.hasObservers()); assertTrue(pp2.hasComplete()); assertFalse(pp2.hasThrowable()); assertNull(pp2.getThrowable()); to.assertResult(1, 2); }
@Test public void flowable() { Flowable<Integer> source = new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onComplete(); s.onError(null); s.onError(new IOException()); s.onNext(null); s.onNext(1); s.onSubscribe(null); s.onSubscribe(new BooleanSubscription()); s.onSubscribe(new BooleanSubscription()); s.onComplete(); s.onNext(2); } }; RxJavaProtocolValidator.setOnViolationHandler(this); Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler()); SavedHooks h = RxJavaProtocolValidator.enableAndChain(); Assert.assertTrue(RxJavaProtocolValidator.isEnabled()); try { Flowable.just(1).test().assertResult(1); Flowable.empty().test().assertResult(); Flowable.error(new IOException()).test().assertFailure(IOException.class); Flowable<Integer> c = RxJavaPlugins.onAssembly(source); c.test(0); Assert.assertEquals(15, errors.size()); TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 1, NullOnErrorParameterException.class); TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 3, MultipleTerminationsException.class); TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class); Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException); TestHelper.assertError(errors, 5, MultipleTerminationsException.class); TestHelper.assertError(errors, 6, NullOnNextParameterException.class); TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class); TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class); TestHelper.assertError(errors, 13, MultipleTerminationsException.class); TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class); } finally { h.restore(); RxJavaProtocolValidator.setOnViolationHandler(null); } }
@Test public void connectableFlowable() { ConnectableFlowable<Integer> source = new ConnectableFlowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onComplete(); s.onError(null); s.onError(new IOException()); s.onNext(null); s.onNext(1); s.onSubscribe(null); s.onSubscribe(new BooleanSubscription()); s.onSubscribe(new BooleanSubscription()); s.onComplete(); s.onNext(2); } @Override public void connect(Consumer<? super Disposable> connection) { } }; RxJavaProtocolValidator.setOnViolationHandler(this); Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler()); SavedHooks h = RxJavaProtocolValidator.enableAndChain(); Assert.assertTrue(RxJavaProtocolValidator.isEnabled()); try { Flowable.just(1).publish().autoConnect().test().assertResult(1); Flowable.empty().publish().autoConnect().test().assertResult(); Flowable.error(new IOException()).test().assertFailure(IOException.class); ConnectableFlowable<Integer> c = RxJavaPlugins.onAssembly(source); c.test(0); c.connect(); Assert.assertEquals(15, errors.size()); TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 1, NullOnErrorParameterException.class); TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 3, MultipleTerminationsException.class); TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class); Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException); TestHelper.assertError(errors, 5, MultipleTerminationsException.class); TestHelper.assertError(errors, 6, NullOnNextParameterException.class); TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class); TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class); TestHelper.assertError(errors, 13, MultipleTerminationsException.class); TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class); } finally { h.restore(); RxJavaProtocolValidator.setOnViolationHandler(null); } }
@SuppressWarnings("unchecked") @Test public void parallelFlowable() { ParallelFlowable<Integer> source = new ParallelFlowable<Integer>() { @Override public void subscribe(Subscriber<? super Integer>[] s) { validate(s); s[0].onComplete(); s[0].onError(null); s[0].onError(new IOException()); s[0].onNext(null); s[0].onNext(1); s[0].onSubscribe(null); s[0].onSubscribe(new BooleanSubscription()); s[0].onSubscribe(new BooleanSubscription()); s[0].onComplete(); s[0].onNext(2); } @Override public int parallelism() { return 1; } }; RxJavaProtocolValidator.setOnViolationHandler(this); Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler()); SavedHooks h = RxJavaProtocolValidator.enableAndChain(); Assert.assertTrue(RxJavaProtocolValidator.isEnabled()); try { Flowable.just(1).publish().autoConnect().test().assertResult(1); Flowable.empty().publish().autoConnect().test().assertResult(); Flowable.error(new IOException()).test().assertFailure(IOException.class); ParallelFlowable<Integer> c = RxJavaPlugins.onAssembly(source); c.subscribe(new Subscriber[] { new TestSubscriber<Integer>(0) }); Assert.assertEquals(15, errors.size()); TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 1, NullOnErrorParameterException.class); TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 3, MultipleTerminationsException.class); TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class); Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException); TestHelper.assertError(errors, 5, MultipleTerminationsException.class); TestHelper.assertError(errors, 6, NullOnNextParameterException.class); TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class); TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class); TestHelper.assertError(errors, 13, MultipleTerminationsException.class); TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class); } finally { h.restore(); RxJavaProtocolValidator.setOnViolationHandler(null); } }
@Test(timeout = 10000) public void cancelDirect() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { ts.onSubscribe(new BooleanSubscription()); Disposable d = scheduler.scheduleDirect(new Runnable() { @Override public void run() { ts.onNext(1); ts.onNext(2); ts.onNext(3); ts.onNext(4); ts.onNext(5); ts.onComplete(); } }, 100, TimeUnit.MILLISECONDS); assertFalse(d.isDisposed()); d.dispose(); assertTrue(d.isDisposed()); scheduler.scheduleDirect(new Runnable() { @Override public void run() { scheduler.shutdown(); } }); ts.assertEmpty(); } }); ts.assertEmpty(); assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); } }
@Test(timeout = 10000) public void cancelDirectUntimed() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { ts.onSubscribe(new BooleanSubscription()); Disposable d = scheduler.scheduleDirect(new Runnable() { @Override public void run() { ts.onNext(1); ts.onNext(2); ts.onNext(3); ts.onNext(4); ts.onNext(5); ts.onComplete(); } }); assertFalse(d.isDisposed()); d.dispose(); assertTrue(d.isDisposed()); scheduler.scheduleDirect(new Runnable() { @Override public void run() { scheduler.shutdown(); } }); ts.assertEmpty(); } }); ts.assertEmpty(); assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); } }
@Test(timeout = 10000) public void cancelWorker() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { ts.onSubscribe(new BooleanSubscription()); final Worker w = scheduler.createWorker(); Disposable d = w.schedule(new Runnable() { @Override public void run() { ts.onNext(1); ts.onNext(2); ts.onNext(3); ts.onNext(4); ts.onNext(5); ts.onComplete(); } }, 100, TimeUnit.MILLISECONDS); assertFalse(d.isDisposed()); d.dispose(); assertTrue(d.isDisposed()); w.schedule(new Runnable() { @Override public void run() { w.dispose(); scheduler.shutdown(); } }); ts.assertEmpty(); } }); ts.assertEmpty(); assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); } }
@Test(timeout = 10000) public void cancelWorkerUntimed() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final BlockingScheduler scheduler = new BlockingScheduler(); scheduler.execute(new Action() { @Override public void run() throws Exception { ts.onSubscribe(new BooleanSubscription()); final Worker w = scheduler.createWorker(); Disposable d = w.schedule(new Runnable() { @Override public void run() { ts.onNext(1); ts.onNext(2); ts.onNext(3); ts.onNext(4); ts.onNext(5); ts.onComplete(); } }); assertFalse(d.isDisposed()); d.dispose(); assertTrue(d.isDisposed()); w.schedule(new Runnable() { @Override public void run() { w.dispose(); scheduler.shutdown(); assertTrue(w.isDisposed()); } }); ts.assertEmpty(); } }); ts.assertEmpty(); assertTrue(errors.toString(), errors.isEmpty()); } finally { RxJavaPlugins.reset(); } }
@Test public void doubleOnSubscribe() { final FlowableProcessor<Integer> rcp = FlowableProcessors.refCount(PublishProcessor.<Integer>create()); BooleanSubscription bs1 = new BooleanSubscription(); rcp.onSubscribe(bs1); BooleanSubscription bs2 = new BooleanSubscription(); rcp.onSubscribe(bs2); assertFalse(bs1.isCancelled()); assertTrue(bs2.isCancelled()); }