public <R> Disposable subscribe(Consumer<R> onNext, Consumer<Throwable> onError, Action onCompleted, FlowableTransformer<T, R> transformer) { Flowable flowable = build(false); if (transformer != null) flowable = flowable.compose(transformer); if (onNext == null) onNext = data -> {}; if (onError == null) onError = error -> { throw new OnErrorNotImplementedException(error); }; if (onCompleted == null) onCompleted = () -> {}; Consumer<R> actualOnNext = onNext; if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled) actualOnNext = RxBusUtil.wrapQueueConsumer(onNext, mQueuer); flowable = applySchedular(flowable); Disposable disposable = flowable.subscribe(actualOnNext, onError, onCompleted); if (mBoundObject != null) RxDisposableManager.addDisposable(mBoundObject, disposable); return disposable; }
@Test public void expect_exceptionThrownAndExpected_shouldMatchExpectedException() throws Throwable { // given Statement originalStatement = new Statement() { @Override public void evaluate() throws Throwable { Single.just("bar") .map(s -> { throw new Exception("foo"); }) .subscribe(); } }; ExpectedUncaughtException uncaughtThrown = ExpectedUncaughtException.none(); uncaughtThrown.expect(OnErrorNotImplementedException.class); uncaughtThrown.expectMessage("foo"); Statement statement = uncaughtThrown.apply(originalStatement, description); // when statement.evaluate(); // then verifyNoMoreInteractions(description); }
@Override public void accept(Throwable e) throws Exception { if (e instanceof OnErrorNotImplementedException) { Promise.error(e.getCause()).then(Action.noop()); } else if (e instanceof UndeliverableException) { Promise.error(e.getCause()).then(Action.noop()); } else { Promise.error(e).then(Action.noop()); } }
@Test public void expect_exceptionThrownMatchesExpectedButExpectingDifferentMessage_shouldFail() throws Throwable { // given Statement originalStatement = new Statement() { @Override public void evaluate() throws Throwable { Single.just("bar") .map(s -> { throw new Exception("foo"); }) .subscribe(); } }; ExpectedUncaughtException uncaughtThrown = ExpectedUncaughtException.none(); uncaughtThrown.expect(OnErrorNotImplementedException.class); uncaughtThrown.expectMessage("bar"); Statement statement = uncaughtThrown.apply(originalStatement, description); thrown.expect(AssertionError.class); thrown.expectMessage("Expected uncaught exception " + "io.reactivex.exceptions.OnErrorNotImplementedException " + "occurred but has unexpected message:\n" + "Expected: \"bar\"\n" + " but: was \"foo\""); // when statement.evaluate(); // then should fail }
@Override public void onError(Throwable throwable) { throw new OnErrorNotImplementedException(throwable); }
@Override public PopularArticlesResource requestPopularArticles() throws IOException { throw new OnErrorNotImplementedException(new Throwable("TODO: implement for not Rx clients")); }