@Test public void testDoesNotTwoErrorsIfUpstreamDoesNotHonourCancellationImmediately() { try { List<Throwable> list = new CopyOnWriteArrayList<Throwable>(); RxJavaPlugins.setErrorHandler(Consumers.addTo(list)); Burst.items(1).error(new ThrowingException())// .compose(Transformers. // collectWhile( // Callables.<List<Integer>>constant(Lists.<Integer>newArrayList()), ADD, // BiPredicates.throwing())) // .test() // .assertNoValues() // .assertError(ThrowingException.class); assertEquals(1, list.size()); System.out.println(list.get(0)); assertTrue(list.get(0) instanceof UndeliverableException); assertTrue(list.get(0).getCause() instanceof ThrowingException); } finally { RxJavaPlugins.reset(); } }
@Override public void onCreate() { super.onCreate(); applicationComponent = buildApplicationComponent(); applicationComponent.inject(this); StrictMode.enableDefaults(); if (analytics.isPresent()) { analytics.get().init(); } RxJavaPlugins.setErrorHandler(throwable -> { if (throwable instanceof UndeliverableException && throwable.getCause() instanceof HttpException) { //If there are many requests sent at once, first error is handler normally, the rest lands here LibrusUtils.log("plugin handle"); LibrusUtils.log(throwable); } else { Thread currentThread = Thread.currentThread(); Thread.UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler(); handler.uncaughtException(currentThread, throwable); } }); }
@Override public void accept(Throwable e) throws Exception { if (e instanceof OnErrorNotImplementedException) { Promise.error(e.getCause()).then(Action.noop()); } else if (e instanceof UndeliverableException) { Promise.error(e.getCause()).then(Action.noop()); } else { Promise.error(e).then(Action.noop()); } }
@Test public void testPoolFactoryWhenFailsThenRecovers() { AtomicReference<Throwable> ex = new AtomicReference<>(); Consumer<? super Throwable> handler = RxJavaPlugins.getErrorHandler(); RxJavaPlugins.setErrorHandler(t -> ex.set(t)); try { TestScheduler s = new TestScheduler(); AtomicInteger c = new AtomicInteger(); NonBlockingPool<Integer> pool = NonBlockingPool.factory(() -> { if (c.getAndIncrement() == 0) { throw new TestException(); } else { return c.get(); } }) // .maxSize(1) // .scheduler(s) // .createRetryInterval(10, TimeUnit.SECONDS) // .build(); TestObserver<Integer> ts = pool.member() // .map(m -> m.value()) // .test() // .assertNotTerminated() // .assertNoValues(); s.triggerActions(); assertTrue(ex.get() instanceof UndeliverableException); assertTrue(((UndeliverableException) ex.get()).getCause() instanceof TestException); s.advanceTimeBy(10, TimeUnit.SECONDS); s.triggerActions(); ts.assertComplete(); ts.assertValue(2); } finally { RxJavaPlugins.setErrorHandler(handler); } }
static boolean isInterruptedIOException(Throwable e) { // unwrap to see if this is an InterruptedIOException bubbled up from rx/okio if (e instanceof UndeliverableException) { if (e.getCause() != null && e.getCause() instanceof UncheckedIOException) { if (e.getCause().getCause() != null && e.getCause().getCause() instanceof InterruptedIOException) { return true; } } } return false; }
private void setupRxErrorHandler() { RxJavaPlugins.setErrorHandler( t -> { Throwable t0 = t; if (t instanceof UndeliverableException) { t0 = t.getCause(); } if(this.failedProcessorException == null) { this.failedProcessorException = t0; } if (t0 instanceof java.util.concurrent.RejectedExecutionException) { // can happen with a processor stop and another start if the old one is interrupted logger.debug("op=unhandled_rejected_execution action=continue {}", t0.getMessage()); } else { if (t0 instanceof NonRetryableNakadiException) { logger.error(String.format( "op=unhandled_non_retryable_exception action=stopping type=NonRetryableNakadiException %s ", ((NonRetryableNakadiException) t0).problem()), t0); stopStreaming(); } else if (t0 instanceof Error) { logger.error(String.format( "op=unhandled_error action=stopping type=NonRetryableNakadiException %s ", t.getMessage()), t0); stopStreaming(); } else { logger.error( String.format("unhandled_unknown_exception action=stopping type=%s %s", t0.getClass().getSimpleName(), t0.getMessage()), t0); stopStreaming(); } } } ); }
public Throwable takeThrowableFromUndeliverableException() { Throwable error = take(); assertThat(error).isInstanceOf(UndeliverableException.class); return error.getCause(); }