@Test public void shouldContinueRunningAfterExceptionInTask(){ AtomicBoolean firstRun = new AtomicBoolean(true); CompletableSubject completedSecondRun = CompletableSubject.create(); KeyedTaskBuffer<String, Integer> taskBuffer = new KeyedTaskBuffer<>(BUFFER_SIZE_2, MAX_WAIT_MILLIS_100, (key, batch) -> { if(firstRun.getAndSet(false)){ throw new RuntimeException("Test exception"); }else{ completedSecondRun.onComplete(); } }); taskBuffer.addData(KEY_A, 1); taskBuffer.addData(KEY_B, 1); //unit test can only finish if this is completed completedSecondRun.timeout(1000, TimeUnit.MILLISECONDS).blockingAwait(); }
@Test public void test() { CompletableSubject subject = CompletableSubject.create(); Completable source = subject.hide(); TestObserver testObserver = new TestObserver(); CompositeDisposable composite = new CompositeDisposable(); Disposable disposable = source .compose(DisposableAttach.<String>to(composite)) .subscribeWith(testObserver); assertTrue(composite.size() == 1); composite.dispose(); assertTrue(composite.size() == 0); assertTrue(composite.isDisposed()); assertTrue(disposable.isDisposed()); assertTrue(testObserver.isDisposed()); }
/** * Apply all the queued operations to the database. * * @return Observable, either immediately available (when offline) or triggered on * Firebase success/failure event. */ public Completable write() { CompletableSubject subject = CompletableSubject.create(); LOGGER.info("Writing with online={}, operations in queue: {}", ServerConnection.isOnline(), OPERATIONS_IN_FLIGHT); OPERATIONS_IN_FLIGHT.incrementAndGet(); Throwable writerStackTrace = new Throwable(); mRoot.updateChildren(mData) .addOnSuccessListener((final Void p) -> subject.onComplete()) .addOnFailureListener((final Exception e) -> { e.setStackTrace(writerStackTrace.getStackTrace()); LOGGER.error("Failed to save {}", mData, e); subject.onError(e); }) .addOnCompleteListener(t -> OPERATIONS_IN_FLIGHT.decrementAndGet()); if (ServerConnection.isOnline()) { return subject; } else { return Completable.complete(); } }
@Test public void autoDispose_withMaybe_normal() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Got the event source.onComplete(); o.assertOnComplete(); // Nothing more, lifecycle disposed too o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(autoDisposable(lifecycle)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); // Lifecycle ends lifecycle.onSuccess(2); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // Event if upstream emits, no one is listening source.onComplete(); o.assertNoMoreEvents(); }
@Test public void autoDispose_withProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); }
@Test public void autoDispose_withProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); MaybeSubject<Integer> scope = MaybeSubject.create(); ScopeProvider provider = makeProvider(scope); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(scope.hasObservers()).isTrue(); scope.onSuccess(1); // All disposed assertThat(source.hasObservers()).isFalse(); assertThat(scope.hasObservers()).isFalse(); // No one is listening source.onComplete(); o.assertNoMoreEvents(); }
@Test public void autoDispose_withLifecycleProvider_completion() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onComplete(); o.assertOnComplete(); o.assertNoMoreEvents(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); CompletableSubject source = CompletableSubject.create(); TestObserver<Void> o = source .as(autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); CompletableSubject source = CompletableSubject.create(); TestObserver<Void> o = source .as(autoDisposable(provider)) .test(); assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithWrappedExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); TestObserver<Void> o = CompletableSubject.create() .as(autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
public MessageAcknowledger(SqsQueue<T> sqsQueue, String receiptId, Instant expirationTime) { this.expirationTime = expirationTime; this.sqsQueue = sqsQueue; this.receiptId = receiptId; this.ackModeSingle = SingleSubject.create(); this.ackingComplete = CompletableSubject.create(); Duration duration = Duration.between(Instant.now(), expirationTime); Completable.timer(duration.toMillis(), TimeUnit.MILLISECONDS).subscribe(this::ignore); }
/** * Returns a Completable that terminates when the given CompletionStage terminates. * @param future the source CompletionStage instance * @return the new Completable instance */ public static Completable fromFuture(CompletionStage<?> future) { CompletableSubject cs = CompletableSubject.create(); future.whenComplete((v, e) -> { if (e != null) { cs.onError(e); } else { cs.onComplete(); } }); return cs; }
@Test public void autoDispose_withLifecycleProvider_interrupted() { RecordingObserver<Integer> o = new RecordingObserver<>(LOGGER); CompletableSubject source = CompletableSubject.create(); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); LifecycleScopeProvider<Integer> provider = makeLifecycleProvider(lifecycle); source.as(autoDisposable(provider)) .subscribe(o); o.takeSubscribe(); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(1); assertThat(source.hasObservers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onNext(3); // All disposed assertThat(source.hasObservers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); // No one is listening source.onComplete(); o.assertNoMoreEvents(); }
@Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { TestObserver<Void> o = CompletableSubject.create() .as(autoDisposable(ScopeProvider.UNBOUND)) .test(); o.assertNoValues(); o.assertNoErrors(); rule.assertNoErrors(); }
@Test public void unbound_shouldStillPassValues() { TestObserver<Void> o = CompletableSubject.create() .as(autoDisposable(ScopeProvider.UNBOUND)) .test(); o.onComplete(); o.assertComplete(); }
@Default public CompletableSubject getResultSubject() { return CompletableSubject.create(); }
@Override public Completable deleteMessage(String receiptHandle) { return Completable.defer(() -> delegate.deleteMessage(receiptHandle)) .retry(this::shouldRetry) .subscribeWith(CompletableSubject.create());//convert to Hot completable }
@Override public Completable changeMessageVisibility(String receiptHandle, Duration newVisibility) { return Completable.defer(() -> delegate.changeMessageVisibility(receiptHandle, newVisibility)) .retry(this::shouldRetry) .subscribeWith(CompletableSubject.create());//convert to Hot completable }