@Test public void testBasicBackpressure() throws InterruptedException, ExecutionException { final int iterations = 100; final SimpleFlowableList<Integer> list = new SimpleFlowableList<>(); TestSubscriber testSubscriber = new TestSubscriber(); list.updates() .observeOn(Schedulers.computation()) .doOnNext(new Consumer<Update<Integer>>() { @Override public void accept(Update<Integer> integerUpdate) { if (integerUpdate.list.size() % 10 == 0) { Thread.yield(); } } }) .subscribe(testSubscriber); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<?> result = executorService.submit(new Runnable() { @Override public void run() { for (int i = 0; i < iterations; ++i) { if (iterations % 9 == 0) { Thread.yield(); } list.add(i); } } }); result.get(); testSubscriber.awaitCount(iterations, BaseTestConsumer.TestWaitStrategy.SLEEP_100MS, 10000); }
@Parameterized.Parameters public static BaseTestConsumer[] testData() { return new BaseTestConsumer[] { Single.fromCallable(failingCallable()).test(), Observable.fromCallable(failingCallable()).test(), Maybe.fromCallable(failingCallable()).test(), Flowable.fromCallable(failingCallable()).test() }; }
public LazyAdaptersRxJavaTest(BaseTestConsumer testConsumer) { this.testConsumer = testConsumer; }