Java 类io.reactivex.observers.BaseTestConsumer 实例源码
项目:rxtools
文件:BackpressureFlowableListTest.java
@Test
public void testBasicBackpressure() throws InterruptedException, ExecutionException
{
final int iterations = 100;
final SimpleFlowableList<Integer> list = new SimpleFlowableList<>();
TestSubscriber testSubscriber = new TestSubscriber();
list.updates()
.observeOn(Schedulers.computation())
.doOnNext(new Consumer<Update<Integer>>() {
@Override
public void accept(Update<Integer> integerUpdate)
{
if (integerUpdate.list.size() % 10 == 0) {
Thread.yield();
}
}
})
.subscribe(testSubscriber);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> result = executorService.submit(new Runnable() {
@Override
public void run()
{
for (int i = 0; i < iterations; ++i) {
if (iterations % 9 == 0) {
Thread.yield();
}
list.add(i);
}
}
});
result.get();
testSubscriber.awaitCount(iterations, BaseTestConsumer.TestWaitStrategy.SLEEP_100MS, 10000);
}
项目:moshi-lazy-adapters
文件:LazyAdaptersRxJavaTest.java
@Parameterized.Parameters
public static BaseTestConsumer[] testData() {
return new BaseTestConsumer[] {
Single.fromCallable(failingCallable()).test(),
Observable.fromCallable(failingCallable()).test(),
Maybe.fromCallable(failingCallable()).test(),
Flowable.fromCallable(failingCallable()).test()
};
}
项目:moshi-lazy-adapters
文件:LazyAdaptersRxJavaTest.java
public LazyAdaptersRxJavaTest(BaseTestConsumer testConsumer) {
this.testConsumer = testConsumer;
}