@Test public void badSource() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final PublishProcessor<Integer> pp1 = PublishProcessor.create(); final Flowable<Integer> pp2 = new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { BooleanSubscription bs1 = new BooleanSubscription(); s.onSubscribe(bs1); BooleanSubscription bs2 = new BooleanSubscription(); s.onSubscribe(bs2); Assert.assertFalse(bs1.isCancelled()); Assert.assertTrue(bs2.isCancelled()); } }; Flowables.zipLatest(pp1, pp2, toString2).test(); TestHelper.assertError(errors, 0, ProtocolViolationException.class); } finally { RxJavaPlugins.reset(); } }
public void onNext(T value) { if (isDisposed()) return; if (terminated) { throw new ProtocolViolationException("OnNext called after Observable terminal event"); } observer.onNext(value); }
/** * Report a ProtocolViolationException with a personalized message referencing * the simple type name of the consumer class and report it via * RxJavaPlugins.onError. * * @param consumer the class of the consumer */ public static void reportDoubleSubscription(Class<?> consumer) { RxJavaPlugins.onError(new ProtocolViolationException(composeMessage(consumer.getName()))); }