@Test public void testClose_waitForCommands() { BehaviorSubject<Boolean> idler = BehaviorSubject.createDefault(false); when(cmdProcessor.isIdle()).thenReturn(idler); RxCmdShell shell = new RxCmdShell(builder); shell.open().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().values().get(0); shell.isAlive().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(true); shell.close().test().awaitDone(1, TimeUnit.SECONDS).assertTimeout(); idler.onNext(true); shell.close().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(0); verify(cmdProcessor).isIdle(); verify(rxShellSession).close(); }
@Test public void testBindUntilLifecycleEvent() { BehaviorSubject<Lifecycle.Event> lifecycle = BehaviorSubject.create(); TestObserver<Object> testObserver = observable.compose(RxLifecycle.bindUntilEvent(lifecycle, Lifecycle.Event.ON_STOP)).test(); lifecycle.onNext(Lifecycle.Event.ON_CREATE); testObserver.assertNotComplete(); lifecycle.onNext(Lifecycle.Event.ON_START); testObserver.assertNotComplete(); lifecycle.onNext(Lifecycle.Event.ON_RESUME); testObserver.assertNotComplete(); lifecycle.onNext(Lifecycle.Event.ON_PAUSE); testObserver.assertNotComplete(); lifecycle.onNext(Lifecycle.Event.ON_STOP); testObserver.assertComplete(); }
@Test public void testBindUntilFragmentEvent() { BehaviorSubject<FragmentEvent> lifecycle = BehaviorSubject.create(); TestObserver<Object> testObserver = observable.compose(RxLifecycle.bindUntilEvent(lifecycle, FragmentEvent.STOP)).test(); lifecycle.onNext(FragmentEvent.ATTACH); testObserver.assertNotComplete(); lifecycle.onNext(FragmentEvent.CREATE); testObserver.assertNotComplete(); lifecycle.onNext(FragmentEvent.CREATE_VIEW); testObserver.assertNotComplete(); lifecycle.onNext(FragmentEvent.START); testObserver.assertNotComplete(); lifecycle.onNext(FragmentEvent.RESUME); testObserver.assertNotComplete(); lifecycle.onNext(FragmentEvent.PAUSE); testObserver.assertNotComplete(); testObserver.assertNotComplete(); lifecycle.onNext(FragmentEvent.STOP); testObserver.assertComplete(); }
@Test public void testBindUntilActivityEvent() { BehaviorSubject<ActivityEvent> lifecycle = BehaviorSubject.create(); TestObserver<Object> testObserver = observable.compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.STOP)).test(); lifecycle.onNext(ActivityEvent.CREATE); testObserver.assertNotComplete(); lifecycle.onNext(ActivityEvent.START); testObserver.assertNotComplete(); lifecycle.onNext(ActivityEvent.RESUME); testObserver.assertNotComplete(); lifecycle.onNext(ActivityEvent.PAUSE); testObserver.assertNotComplete(); lifecycle.onNext(ActivityEvent.STOP); testObserver.assertComplete(); }
/** * 当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据 * (如果此时还 没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。 * 然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 */ private void doSomeWork() { BehaviorSubject<Integer> source = BehaviorSubject.create(); source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete source.onNext(1); source.onNext(2); source.onNext(3); /* * it will emit 3(last emitted), 4 and onComplete for second observer also. */ source.subscribe(getSecondObserver()); source.onNext(4); source.onComplete(); }
@Test public void autoDispose_withProvider_success() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onSuccess(3); o.takeSuccess(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withLifecycleProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); MaybeSubject<Integer> source = MaybeSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); MaybeSubject<Integer> source = MaybeSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); MaybeSubject<Integer> source = MaybeSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); MaybeSubject<Integer> source = MaybeSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
@Test public void autoDispose_withLifecycleProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); CompletableSubject source = CompletableSubject.create(); TestObserver<Void> o = source .as(autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); CompletableSubject source = CompletableSubject.create(); TestObserver<Void> o = source .as(autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); TestObserver<Void> o = CompletableSubject.create() .as(autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
@Test public void autoDispose_withProvider_withoutStartingLifecycle_shouldFail() { BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); TestSubscriber<Integer> firstSubscriber = new TestSubscriber<>(); TestSubscriber<Integer> secondSubscriber = new TestSubscriber<>(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); //noinspection unchecked Subscriber<Integer>[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; Flowable.just(1, 2) .parallel(DEFAULT_PARALLELISM) .as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(subscribers); List<Throwable> errors1 = firstSubscriber.errors(); assertThat(errors1).hasSize(1); assertThat(errors1.get(0)).isInstanceOf(LifecycleNotStartedException.class); List<Throwable> errors2 = secondSubscriber.errors(); assertThat(errors2).hasSize(1); assertThat(errors2.get(0)).isInstanceOf(LifecycleNotStartedException.class); }
@Test public void autoDispose_withProvider_afterLifecycle_shouldFail() { BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); TestSubscriber<Integer> firstSubscriber = new TestSubscriber<>(); TestSubscriber<Integer> secondSubscriber = new TestSubscriber<>(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); //noinspection unchecked Subscriber<Integer>[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; Flowable.just(1, 2) .parallel(DEFAULT_PARALLELISM) .as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(subscribers); List<Throwable> errors1 = firstSubscriber.errors(); assertThat(errors1).hasSize(1); assertThat(errors1.get(0)).isInstanceOf(LifecycleEndedException.class); List<Throwable> errors2 = secondSubscriber.errors(); assertThat(errors2).hasSize(1); assertThat(errors2.get(0)).isInstanceOf(LifecycleEndedException.class); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishSubject<Integer> source = PublishSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishSubject<Integer> source = PublishSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishSubject<Integer> source = PublishSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
@Test public void autoDispose_withLifecycleProvider() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onSuccess(3); o.takeSuccess(); // All cleaned up o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); SingleSubject<Integer> source = SingleSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(AutoDispose.<Integer>autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onNext(3); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // No one is listening even if upstream finally does emit source.onSuccess(3); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); SingleSubject<Integer> source = SingleSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); SingleSubject<Integer> source = SingleSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); SingleSubject<Integer> source = SingleSubject.create(); TestObserver<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor<Integer> source = PublishProcessor.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor<Integer> source = PublishProcessor.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor<Integer> source = PublishProcessor.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
static LifecycleScopeProvider<Integer> makeLifecycleProvider( final BehaviorSubject<Integer> lifecycle) { return new LifecycleScopeProvider<Integer>() { @Override public Observable<Integer> lifecycle() { return lifecycle; } @Override public Function<Integer, Integer> correspondingEvents() { return CORRESPONDING_EVENTS; } @Override public Integer peekLifecycle() { return lifecycle.getValue(); } }; }
@Test public void shouldEmitValueAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Integer> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestObserver<Integer> testObserver = Observable.just(0) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertValue(0); testObserver.assertComplete(); }
@Test public void shouldReplayAllValuesAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Integer> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestObserver<Integer> testObserver = Observable.just(0, 1, 2) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertValues(0, 1, 2); testObserver.assertComplete(); }
@Test public void shouldEmitErrorAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewReplayTransformer<Integer> transformer = new WaitViewReplayTransformer<>( view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); Exception exception = new RuntimeException(); TestObserver<Integer> testObserver = Observable.<Integer>error(exception) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertError(exception); testObserver.assertNotComplete(); }
@Test public void shouldEmitValueAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Integer> transformer = new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestObserver<Integer> testObserver = Observable.just(0) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertValue(0); testObserver.assertComplete(); }
@Test public void shouldNotEmitValuesAfterViewIsDetached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Long> transformer = new WaitViewLatestTransformer<>(view); TestObserver<Long> testObserver = Observable.interval(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.assertValueCount(1); view.onNext(false); testScheduler.advanceTimeBy(2 * EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.assertValueCount(1); }
@Test public void shouldEmitLatestValueAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Integer> transformer = new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); TestObserver<Integer> testObserver = Observable.fromArray(0, 1, 2) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertValue(2); testObserver.assertComplete(); }
@Test public void shouldEmitErrorAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Integer> transformer = new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); Exception exception = new RuntimeException(); TestObserver<Integer> testObserver = Observable.<Integer>error(exception) .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.awaitTerminalEvent(); testObserver.assertError(exception); }
@Test public void shouldEmitLatestValueFromEndlessWithSingleEmissionAfterViewIsAttached() { TestScheduler testScheduler = new TestScheduler(); BehaviorSubject<Boolean> view = BehaviorSubject.create(); view.onNext(true); WaitViewLatestTransformer<Integer> transformer = new WaitViewLatestTransformer<>(view.delay(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS, testScheduler)); BehaviorSubject<Integer> subject = BehaviorSubject.create(); subject.onNext(1); TestObserver<Integer> testObserver = subject .compose(transformer) .test(); testScheduler.advanceTimeBy(EMIT_DELAY_IN_SECONDS, TimeUnit.SECONDS); testObserver.assertValue(1); testObserver.assertNotComplete(); }
public static void main(String[] args) { Subject<String> subject = BehaviorSubject.create(); subject.subscribe(s -> System.out.println("Observer 1: " + s)); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.subscribe(s -> System.out.println("Observer 2: " + s)); }
/** * Creaes a new Presenter with the initial view state * * @param initialViewState initial view state (must be not null) */ public MviBasePresenter(@NonNull VS initialViewState) { if (initialViewState == null) { throw new NullPointerException("Initial ViewState == null"); } viewStateBehaviorSubject = BehaviorSubject.createDefault(initialViewState); reset(); }
public <T> T progressWait(final Single<T> task) { final BehaviorSubject<T> subject = BehaviorSubject.create(); task .subscribeOn(Schedulers.io()) .subscribe(subject::onNext); heartBeat .takeWhile(heartBeat -> !subject.hasValue()) .blockingSubscribe(heartBeat -> callProgress(heartBeatIndication)); return subject.getValue(); }