Java 类io.reactivex.internal.subscriptions.BooleanSubscription 实例源码

项目:RxJava2Extensions    文件:MulticastProcessorTest.java   
@Test
public void refCounted() {
    MulticastProcessor<Integer> mp = MulticastProcessor.create(true);
    BooleanSubscription bs = new BooleanSubscription();

    mp.onSubscribe(bs);

    assertFalse(bs.isCancelled());

    mp.test().cancel();

    assertTrue(bs.isCancelled());

    assertFalse(mp.hasSubscribers());
    assertTrue(mp.hasComplete());
    assertFalse(mp.hasThrowable());
    assertNull(mp.getThrowable());

    mp.test().assertResult();
}
项目:RxJava2Extensions    文件:MulticastProcessorTest.java   
@Test
public void refCounted2() {
    MulticastProcessor<Integer> mp = MulticastProcessor.create(16, true);
    BooleanSubscription bs = new BooleanSubscription();

    mp.onSubscribe(bs);

    assertFalse(bs.isCancelled());

    mp.test(1, true);

    assertTrue(bs.isCancelled());

    assertFalse(mp.hasSubscribers());
    assertTrue(mp.hasComplete());
    assertFalse(mp.hasThrowable());
    assertNull(mp.getThrowable());

    mp.test().assertResult();
}
项目:RxJava2Extensions    文件:MulticastProcessorTest.java   
@Test
public void multiStart() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        MulticastProcessor<Integer> mp = MulticastProcessor.create(4, false);

        mp.start();
        mp.start();
        mp.startUnbounded();
        BooleanSubscription bs = new BooleanSubscription();
        mp.onSubscribe(bs);

        assertTrue(bs.isCancelled());

        TestHelper.assertError(errors, 0, ProtocolViolationException.class);
        TestHelper.assertError(errors, 1, ProtocolViolationException.class);
        TestHelper.assertError(errors, 2, ProtocolViolationException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableIndexOfTest.java   
@Test
public void foundWithUnconditionalOnCompleteAfter() {
    new Flowable<Integer>() {
        @Override
        protected void subscribeActual(Subscriber<? super Integer> s) {
            s.onSubscribe(new BooleanSubscription());
            s.onNext(10);
            s.onComplete();
        }
    }
    .compose(FlowableTransformers.indexOf(new Predicate<Integer>() {
        @Override
        public boolean test(Integer v) throws Exception {
            return v == 10;
        }
    }))
    .test()
    .assertResult(0L);
}
项目:RxJava2Extensions    文件:FlowableSwitchFlatMapTest.java   
@Test
public void outerDoubleError() {
    List<Throwable> error = TestHelper.trackPluginErrors();
    try {
        final PublishProcessor<Integer> pp2 = PublishProcessor.create();

        new Flowable<Integer>() {
            @Override
            protected void subscribeActual(Subscriber<? super Integer> s) {
                s.onSubscribe(new BooleanSubscription());
                s.onError(new IOException());
                s.onError(new IllegalArgumentException());
            }
        }
        .compose(FlowableTransformers.switchFlatMap(Functions.justFunction(pp2), 2))
        .test()
        .assertFailure(IOException.class);

        TestHelper.assertError(error, 0, IllegalArgumentException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableSwitchFlatMapTest.java   
@Test
public void innerDoubleError() {
    List<Throwable> error = TestHelper.trackPluginErrors();
    try {
        Flowable.just(1)
        .compose(FlowableTransformers.switchFlatMap(Functions.justFunction(new Flowable<Integer>() {
            @Override
            protected void subscribeActual(Subscriber<? super Integer> s) {
                s.onSubscribe(new BooleanSubscription());
                s.onError(new IOException());
                s.onError(new IllegalArgumentException());
            }
        }), 2))
        .test()
        .assertFailure(IOException.class);

        TestHelper.assertError(error, 0, IllegalArgumentException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableFilterAsyncTest.java   
@Test
public void oneAndErrorInner() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        Flowable.just(1)
        .compose(FlowableTransformers.filterAsync(new Function<Object, Publisher<Boolean>>() {
            @Override
            public Publisher<Boolean> apply(Object v) throws Exception {
                return new Flowable<Boolean>() {
                    @Override
                    public void subscribeActual(Subscriber<? super Boolean> s) {
                        s.onSubscribe(new BooleanSubscription());
                        s.onNext(true);
                        s.onError(new IOException());
                    }
                };
            }
        }, 16))
        .test()
        .assertResult(1);

        TestHelper.assertUndeliverable(errors, 0, IOException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableZipLatestTest.java   
@Test
public void badSource() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final PublishProcessor<Integer> pp1 = PublishProcessor.create();
        final Flowable<Integer> pp2 = new Flowable<Integer>() {
            @Override
            protected void subscribeActual(Subscriber<? super Integer> s) {
                BooleanSubscription bs1 = new BooleanSubscription();
                s.onSubscribe(bs1);

                BooleanSubscription bs2 = new BooleanSubscription();
                s.onSubscribe(bs2);

                Assert.assertFalse(bs1.isCancelled());
                Assert.assertTrue(bs2.isCancelled());
            }
        };

        Flowables.zipLatest(pp1, pp2, toString2).test();

        TestHelper.assertError(errors, 0, ProtocolViolationException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableMapAsyncTest.java   
@Test
public void oneAndErrorInner() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        Flowable.just(1)
        .compose(FlowableTransformers.mapAsync(new Function<Object, Publisher<Integer>>() {
            @Override
            public Publisher<Integer> apply(Object v) throws Exception {
                return new Flowable<Integer>() {
                    @Override
                    public void subscribeActual(Subscriber<? super Integer> s) {
                        s.onSubscribe(new BooleanSubscription());
                        s.onNext(1);
                        s.onError(new IOException());
                    }
                };
            }
        }, 16))
        .test()
        .assertResult(1);

        TestHelper.assertUndeliverable(errors, 0, IOException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:NonoTest.java   
void checkNoNext(Function<Nono, Nono> mapper) {
    try {
        mapper.apply(new Nono() {
            @Override
            protected void subscribeActual(Subscriber<? super Void> s) {
                s.onSubscribe(new BooleanSubscription());
                s.onNext(null);
                s.onComplete();
            }
        })
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult();
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void directUntimed() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {
                ts.onSubscribe(new BooleanSubscription());

                scheduler.scheduleDirect(new Runnable() {
                    @Override
                    public void run() {
                        ts.onNext(1);
                        ts.onNext(2);
                        ts.onNext(3);
                        ts.onNext(4);
                        ts.onNext(5);
                        ts.onComplete();

                        scheduler.shutdown();
                    }
                });

                ts.assertEmpty();
            }
        });

        ts.assertResult(1, 2, 3, 4, 5);
        for (Throwable t : errors) {
            t.printStackTrace();
        }
        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void directTimed() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {
                ts.onSubscribe(new BooleanSubscription());

                scheduler.scheduleDirect(new Runnable() {
                    @Override
                    public void run() {
                        ts.onNext(1);
                        ts.onNext(2);
                        ts.onNext(3);
                        ts.onNext(4);
                        ts.onNext(5);
                        ts.onComplete();

                        scheduler.shutdown();
                    }
                }, 100, TimeUnit.MILLISECONDS);

                ts.assertEmpty();
            }
        });

        ts.assertResult(1, 2, 3, 4, 5);
        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableProcessorsTest.java   
@Test
public void normal() {
    FlowableProcessor<Integer> fp = FlowableProcessors.wrap(sp);

    fp.onSubscribe(new BooleanSubscription());

    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    TestSubscriber<Integer> ts = fp.test();

    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    fp.onNext(1);
    fp.onNext(2);

    ts.assertValues(1, 2);

    assertFalse(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    fp.onComplete();

    assertTrue(fp.hasComplete());
    assertFalse(fp.hasThrowable());
    assertNull(fp.getThrowable());

    ts.assertResult(1, 2);
}
项目:RxJava2Extensions    文件:FlowableConsumersTest.java   
@Test
public void badSource() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        FlowableConsumers.subscribeAutoDispose(
                new Flowable<Integer>() {
                    @Override
                    protected void subscribeActual(
                            Subscriber<? super Integer> s) {
                        s.onSubscribe(new BooleanSubscription());
                        s.onNext(1);
                        s.onComplete();

                        s.onSubscribe(new BooleanSubscription());
                        s.onNext(2);
                        s.onComplete();
                        s.onError(new IOException());
                    }
                }, composite, this, this, this
            );

        assertEquals(Arrays.<Object>asList(1, "OnComplete"), events);

        TestHelper.assertUndeliverable(errors, 0, IOException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableWindowPredicateTest.java   
@SuppressWarnings("unchecked")
@Test
public void doubleError() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        new Flowable<Integer>() {
            @Override
            protected void subscribeActual(Subscriber<? super Integer> s) {
                s.onSubscribe(new BooleanSubscription());
                s.onError(new IllegalArgumentException());
                s.onError(new IOException());
            }
        }
        .compose(FlowableTransformers.windowWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer v) throws Exception {
                return v != -1;
            }
        }))
        .flatMapSingle(toList)
        .test()
        .assertFailure(IllegalArgumentException.class);

        TestHelper.assertUndeliverable(errors, 0, IOException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:FlowableBufferPredicateTest.java   
@SuppressWarnings("unchecked")
@Test
public void doubleError() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        new Flowable<Integer>() {
            @Override
            protected void subscribeActual(Subscriber<? super Integer> s) {
                s.onSubscribe(new BooleanSubscription());
                s.onError(new IllegalArgumentException());
                s.onError(new IOException());
            }
        }
        .compose(FlowableTransformers.bufferWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer v) throws Exception {
                return v != -1;
            }
        }))
        .test()
        .assertFailure(IllegalArgumentException.class);

        TestHelper.assertUndeliverable(errors, 0, IOException.class);
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Interop    文件:RxJavaInteropTest.java   
@Test
public void sj1ToFp2Lifecycle2() {
    rx.subjects.Subject<Integer, Integer> sj1 = rx.subjects.PublishSubject.create();
    io.reactivex.processors.FlowableProcessor<Integer> pp2 = toV2Processor(sj1);

    io.reactivex.subscribers.TestSubscriber<Integer> to = pp2.test();

    assertTrue(pp2.hasSubscribers());
    assertTrue(sj1.hasObservers());
    assertFalse(pp2.hasComplete());
    assertFalse(pp2.hasThrowable());
    assertNull(pp2.getThrowable());

    BooleanSubscription d1 = new BooleanSubscription();
    pp2.onSubscribe(d1);

    assertFalse(d1.isCancelled());

    pp2.onNext(1);
    pp2.onNext(2);
    pp2.onComplete();
    pp2.onComplete();
    pp2.onError(new IOException());
    pp2.onNext(3);

    BooleanSubscription d2 = new BooleanSubscription();
    pp2.onSubscribe(d2);

    assertFalse(d1.isCancelled());
    assertTrue(d2.isCancelled());

    assertFalse(pp2.hasSubscribers());
    assertFalse(sj1.hasObservers());

    assertTrue(pp2.hasComplete());
    assertFalse(pp2.hasThrowable());
    assertNull(pp2.getThrowable());

    to.assertResult(1, 2);
}
项目:RxJava2Extensions    文件:RxJavaProtocolValidatorTest.java   
@Test
public void flowable() {
    Flowable<Integer> source = new Flowable<Integer>() {

        @Override
        protected void subscribeActual(Subscriber<? super Integer> s) {
            s.onComplete();
            s.onError(null);
            s.onError(new IOException());
            s.onNext(null);
            s.onNext(1);
            s.onSubscribe(null);
            s.onSubscribe(new BooleanSubscription());
            s.onSubscribe(new BooleanSubscription());
            s.onComplete();
            s.onNext(2);
        }
    };

    RxJavaProtocolValidator.setOnViolationHandler(this);
    Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());

    SavedHooks h = RxJavaProtocolValidator.enableAndChain();
    Assert.assertTrue(RxJavaProtocolValidator.isEnabled());

    try {
        Flowable.just(1).test().assertResult(1);
        Flowable.empty().test().assertResult();
        Flowable.error(new IOException()).test().assertFailure(IOException.class);

        Flowable<Integer> c = RxJavaPlugins.onAssembly(source);

        c.test(0);

        Assert.assertEquals(15, errors.size());
        TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
        TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
        Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
        TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
        TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
        TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
        TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
    } finally {
        h.restore();
        RxJavaProtocolValidator.setOnViolationHandler(null);
    }
}
项目:RxJava2Extensions    文件:RxJavaProtocolValidatorTest.java   
@Test
public void connectableFlowable() {
    ConnectableFlowable<Integer> source = new ConnectableFlowable<Integer>() {

        @Override
        protected void subscribeActual(Subscriber<? super Integer> s) {
            s.onComplete();
            s.onError(null);
            s.onError(new IOException());
            s.onNext(null);
            s.onNext(1);
            s.onSubscribe(null);
            s.onSubscribe(new BooleanSubscription());
            s.onSubscribe(new BooleanSubscription());
            s.onComplete();
            s.onNext(2);
        }

        @Override
        public void connect(Consumer<? super Disposable> connection) {
        }
    };

    RxJavaProtocolValidator.setOnViolationHandler(this);
    Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());

    SavedHooks h = RxJavaProtocolValidator.enableAndChain();
    Assert.assertTrue(RxJavaProtocolValidator.isEnabled());

    try {
        Flowable.just(1).publish().autoConnect().test().assertResult(1);
        Flowable.empty().publish().autoConnect().test().assertResult();
        Flowable.error(new IOException()).test().assertFailure(IOException.class);

        ConnectableFlowable<Integer> c = RxJavaPlugins.onAssembly(source);

        c.test(0);

        c.connect();

        Assert.assertEquals(15, errors.size());
        TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
        TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
        Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
        TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
        TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
        TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
        TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
    } finally {
        h.restore();
        RxJavaProtocolValidator.setOnViolationHandler(null);
    }
}
项目:RxJava2Extensions    文件:RxJavaProtocolValidatorTest.java   
@SuppressWarnings("unchecked")
@Test
public void parallelFlowable() {
    ParallelFlowable<Integer> source = new ParallelFlowable<Integer>() {

        @Override
        public void subscribe(Subscriber<? super Integer>[] s) {
            validate(s);
            s[0].onComplete();
            s[0].onError(null);
            s[0].onError(new IOException());
            s[0].onNext(null);
            s[0].onNext(1);
            s[0].onSubscribe(null);
            s[0].onSubscribe(new BooleanSubscription());
            s[0].onSubscribe(new BooleanSubscription());
            s[0].onComplete();
            s[0].onNext(2);
        }

        @Override
        public int parallelism() {
            return 1;
        }
    };

    RxJavaProtocolValidator.setOnViolationHandler(this);
    Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler());

    SavedHooks h = RxJavaProtocolValidator.enableAndChain();
    Assert.assertTrue(RxJavaProtocolValidator.isEnabled());

    try {
        Flowable.just(1).publish().autoConnect().test().assertResult(1);
        Flowable.empty().publish().autoConnect().test().assertResult();
        Flowable.error(new IOException()).test().assertFailure(IOException.class);

        ParallelFlowable<Integer> c = RxJavaPlugins.onAssembly(source);

        c.subscribe(new Subscriber[] { new TestSubscriber<Integer>(0) });

        Assert.assertEquals(15, errors.size());
        TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 1, NullOnErrorParameterException.class);
        TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 3, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class);
        Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException);
        TestHelper.assertError(errors, 5, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 6, NullOnNextParameterException.class);
        TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class);
        TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class);
        TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class);
        TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class);
        TestHelper.assertError(errors, 13, MultipleTerminationsException.class);
        TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class);
    } finally {
        h.restore();
        RxJavaProtocolValidator.setOnViolationHandler(null);
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void cancelDirect() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {
                ts.onSubscribe(new BooleanSubscription());

                Disposable d = scheduler.scheduleDirect(new Runnable() {
                    @Override
                    public void run() {
                        ts.onNext(1);
                        ts.onNext(2);
                        ts.onNext(3);
                        ts.onNext(4);
                        ts.onNext(5);
                        ts.onComplete();
                    }
                }, 100, TimeUnit.MILLISECONDS);

                assertFalse(d.isDisposed());
                d.dispose();
                assertTrue(d.isDisposed());

                scheduler.scheduleDirect(new Runnable() {
                    @Override
                    public void run() {
                        scheduler.shutdown();
                    }
                });

                ts.assertEmpty();
            }
        });

        ts.assertEmpty();
        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void cancelDirectUntimed() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {
                ts.onSubscribe(new BooleanSubscription());

                Disposable d = scheduler.scheduleDirect(new Runnable() {
                    @Override
                    public void run() {
                        ts.onNext(1);
                        ts.onNext(2);
                        ts.onNext(3);
                        ts.onNext(4);
                        ts.onNext(5);
                        ts.onComplete();
                    }
                });

                assertFalse(d.isDisposed());
                d.dispose();
                assertTrue(d.isDisposed());

                scheduler.scheduleDirect(new Runnable() {
                    @Override
                    public void run() {
                        scheduler.shutdown();
                    }
                });

                ts.assertEmpty();
            }
        });

        ts.assertEmpty();
        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void cancelWorker() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {

                ts.onSubscribe(new BooleanSubscription());

                final Worker w = scheduler.createWorker();

                Disposable d = w.schedule(new Runnable() {
                    @Override
                    public void run() {
                        ts.onNext(1);
                        ts.onNext(2);
                        ts.onNext(3);
                        ts.onNext(4);
                        ts.onNext(5);
                        ts.onComplete();
                    }
                }, 100, TimeUnit.MILLISECONDS);

                assertFalse(d.isDisposed());
                d.dispose();
                assertTrue(d.isDisposed());

                w.schedule(new Runnable() {
                    @Override
                    public void run() {
                        w.dispose();
                        scheduler.shutdown();
                    }
                });

                ts.assertEmpty();
            }
        });

        ts.assertEmpty();
        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:BlockingSchedulerTest.java   
@Test(timeout = 10000)
public void cancelWorkerUntimed() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
        final BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(new Action() {
            @Override
            public void run() throws Exception {

                ts.onSubscribe(new BooleanSubscription());

                final Worker w = scheduler.createWorker();

                Disposable d = w.schedule(new Runnable() {
                    @Override
                    public void run() {
                        ts.onNext(1);
                        ts.onNext(2);
                        ts.onNext(3);
                        ts.onNext(4);
                        ts.onNext(5);
                        ts.onComplete();
                    }
                });

                assertFalse(d.isDisposed());
                d.dispose();
                assertTrue(d.isDisposed());

                w.schedule(new Runnable() {
                    @Override
                    public void run() {
                        w.dispose();
                        scheduler.shutdown();

                        assertTrue(w.isDisposed());
                    }
                });

                ts.assertEmpty();
            }
        });

        ts.assertEmpty();
        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}
项目:RxJava2Extensions    文件:RefCountProcessorTest.java   
@Test
public void doubleOnSubscribe() {
    final FlowableProcessor<Integer> rcp = FlowableProcessors.refCount(PublishProcessor.<Integer>create());

    BooleanSubscription bs1 = new BooleanSubscription();

    rcp.onSubscribe(bs1);

    BooleanSubscription bs2 = new BooleanSubscription();

    rcp.onSubscribe(bs2);

    assertFalse(bs1.isCancelled());
    assertTrue(bs2.isCancelled());
}