@Test public void test() { MaybeSubject<String> subject = MaybeSubject.create(); Maybe<String> maybeSource = subject.hide(); TestObserver testObserver = new TestObserver(); CompositeDisposable composite = new CompositeDisposable(); Disposable disposable = maybeSource .compose(DisposableAttach.<String>to(composite)) .subscribeWith(testObserver); subject.onSuccess("Foo"); testObserver.assertValue("Foo"); assertTrue(composite.size() == 1); composite.dispose(); assertTrue(composite.size() == 0); assertTrue(composite.isDisposed()); assertTrue(disposable.isDisposed()); assertTrue(testObserver.isDisposed()); }
@Test public void autoDispose_withMaybe_normal() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Got the event source.onSuccess(1); assertThat(o.takeSuccess()).isEqualTo(1); // Nothing more, lifecycle disposed too o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) { } }); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onSuccess(2); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // Event if upstream emits, no one is listening source.onSuccess(2); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProvider_success() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onSuccess(3); o.takeSuccess(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); scope.onSuccess(1); // All disposed assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); // No one is listening source.onSuccess(3); o.assertNoMoreEvents(); }
@Test public void autoDispose_withLifecycleProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); MaybeSubject<Integer> source = MaybeSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); MaybeSubject<Integer> source = MaybeSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); MaybeSubject<Integer> source = MaybeSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() { @Override public void subscribe(MaybeEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_normal() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Got the event source.onComplete(); o.assertOnComplete(); // Nothing more, lifecycle disposed too o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onSuccess(2); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // Event if upstream emits, no one is listening source.onComplete(); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); scope.onSuccess(1); // All disposed assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); // No one is listening source.onComplete(); o.assertNoMoreEvents(); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Completable source = Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(CompletableEmitter e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_normal() { TestObserver<Integer> o = new TestObserver<>(); PublishSubject<Integer> source = PublishSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); Disposable d = source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribeWith(o); o.assertSubscribed(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); o.assertValue(1); source.onNext(2); source.onComplete(); o.assertValues(1, 2); o.assertComplete(); assertThat(d.isDisposed()).isFalse(); // Because it completed normally, was not disposed. assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); PublishSubject<Integer> source = PublishSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); assertThat(o.takeNext()).isEqualTo(1); lifecycle.onSuccess(2); source.onNext(2); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_normal() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Got the event source.onSuccess(1); assertThat(o.takeSuccess()).isEqualTo(1); // Nothing more, lifecycle disposed too o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onSuccess(2); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // Event if upstream emits, no one is listening source.onSuccess(2); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProvider() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onSuccess(3); o.takeSuccess(); // All cleaned up o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); // Lifecycle ends scope.onSuccess(3); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); // No one is listening even if upstream finally does emit source.onSuccess(3); o.assertNoMoreEvents(); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() { @Override public void subscribe(SingleEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_normal() { TestSubscriber<Integer> o = new TestSubscriber<>(); PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); Disposable d = source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribeWith(o); o.assertSubscribed(); assertThat(source.hasSubscribers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); o.assertValue(1); source.onNext(2); source.onComplete(); o.assertValues(1, 2); o.assertComplete(); assertThat(d.isDisposed()).isFalse(); // Because it completes normally assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(lifecycle)) .test(); o.assertSubscribed(); assertThat(source.hasSubscribers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); o.assertValue(1); lifecycle.onSuccess(2); source.onNext(2); // No more events o.assertValue(1); // Unsubscribed assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Override public Location loadInBackground() { mCancel = MaybeSubject.create(); ArrayList<MaybeSource<Location>> sources; if (mPriorities != null) { sources = new ArrayList<>(mPriorities.length + 1); for (int priority : mPriorities) { sources.add(mProvider.getCurrentLocation(priority)); } } else { sources = new ArrayList<>(2); sources.add(mProvider.getCurrentLocation(mPriority)); } sources.add(mCancel); Location location = Maybe.amb(sources).blockingGet(); mCancel = null; return location; }
/** * Returns a Maybe that emits the resulting value of the CompletionStage or * its error, treating null as empty source. * @param <T> the value type * @param cs the source CompletionStage instance * @return the new Maybe instance */ public static <T> Maybe<T> fromFuture(CompletionStage<T> cs) { MaybeSubject<T> ms = MaybeSubject.create(); cs.whenComplete((v, e) -> { if (e != null) { ms.onError(e); } else if (v != null) { ms.onSuccess(v); } else { ms.onComplete(); } }); return ms; }
@Test public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(3); // All disposed assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // No one is listening source.onSuccess(3); o.assertNoMoreEvents(); }
@Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { TestObserver<Object> o = MaybeSubject.create() .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND)) .test(); o.assertNoValues(); o.assertNoErrors(); rule.assertNoErrors(); }
@Test public void unbound_shouldStillPassValues() { MaybeSubject<Integer> s = MaybeSubject.create(); TestObserver<Integer> o = s .as(AutoDispose.<Integer>autoDisposable(ScopeProvider.UNBOUND)) .test(); s.onSuccess(1); o.assertValue(1); }
@Test public void ifParallelism_and_subscribersCount_dontMatch_shouldFail() { TestSubscriber<Integer> subscriber = new TestSubscriber<>(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); //noinspection unchecked Subscriber<Integer>[] subscribers = new Subscriber[] {subscriber}; Flowable.just(1, 2) .parallel(DEFAULT_PARALLELISM) .as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(subscribers); List<Throwable> errors = subscriber.errors(); assertThat(errors).hasSize(1); assertThat(errors.get(0)).isInstanceOf(IllegalArgumentException.class); }
@Test public void autoDispose_withMaybe_normal() { TestSubscriber<Integer> firstSubscriber = new TestSubscriber<>(); TestSubscriber<Integer> secondSubscriber = new TestSubscriber<>(); PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); //noinspection unchecked Subscriber<Integer>[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; source .parallel(DEFAULT_PARALLELISM) .as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(subscribers); firstSubscriber.assertSubscribed(); secondSubscriber.assertSubscribed(); assertThat(source.hasSubscribers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); source.onNext(2); firstSubscriber.assertValue(1); secondSubscriber.assertValue(2); source.onNext(3); source.onNext(4); source.onComplete(); firstSubscriber.assertValues(1, 3); firstSubscriber.assertComplete(); secondSubscriber.assertValues(2, 4); secondSubscriber.assertComplete(); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { TestSubscriber<Integer> firstSubscriber = new TestSubscriber<>(); TestSubscriber<Integer> secondSubscriber = new TestSubscriber<>(); PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); //noinspection unchecked Subscriber<Integer>[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; source .parallel(DEFAULT_PARALLELISM) .as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(subscribers); firstSubscriber.assertSubscribed(); secondSubscriber.assertSubscribed(); source.onNext(1); source.onNext(2); firstSubscriber.assertValue(1); secondSubscriber.assertValue(2); lifecycle.onSuccess(2); source.onNext(3); firstSubscriber.assertValue(1); secondSubscriber.assertValue(2); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); PublishSubject<Integer> source = PublishSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = TestUtil.makeProvider(scope); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onNext(1); assertThat(o.takeNext()).isEqualTo(1); source.onNext(2); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); assertThat(o.takeNext()).isEqualTo(2); scope.onSuccess(3); source.onNext(3); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java final ObservableEmitter<Integer>[] emitter = new ObservableEmitter[1]; Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); emitter[0] = e; } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); emitter[0].onNext(1); lifecycle.onSuccess(0); emitter[0].onNext(2); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void delegateArg() { MaybeSubject<Integer> s = MaybeSubject.create(); TestScopeProvider provider = TestScopeProvider.create(s); provider.requestScope() .subscribe(o); provider.emit(); o.assertValueCount(1); }
@Test public void delegateArgEmits() { MaybeSubject<Integer> s = MaybeSubject.create(); TestScopeProvider provider = TestScopeProvider.create(s); provider.requestScope() .subscribe(o); s.onSuccess(1); o.assertValueCount(1); o.assertValue(1); }
@Test public void delegateArg_error() { MaybeSubject<Integer> s = MaybeSubject.create(); TestScopeProvider provider = TestScopeProvider.create(s); provider.requestScope() .subscribe(o); s.onError(new IllegalArgumentException()); o.assertError(IllegalArgumentException.class); }
@Test public void delegateArg_complete() { MaybeSubject<Integer> s = MaybeSubject.create(); TestScopeProvider provider = TestScopeProvider.create(s); provider.requestScope() .subscribe(o); s.onComplete(); o.assertComplete(); }
@Test public void autoDispose_withProvider() { PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = TestUtil.makeProvider(scope); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertSubscribed(); assertThat(source.hasSubscribers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onNext(1); o.assertValue(1); source.onNext(2); assertThat(source.hasSubscribers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); o.assertValues(1, 2); scope.onSuccess(3); source.onNext(3); // Nothing new o.assertValues(1, 2); // Unsubscribed assertThat(source.hasSubscribers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }