Java 类io.reactivex.internal.functions.Functions 实例源码
项目:RxJava2Swing
文件:RxSwingPluginsTest.java
@Test
public void onScheduleCrashes() {
RxSwingPlugins.setOnSchedule(new Function<Runnable, Runnable>() {
@Override
public Runnable apply(Runnable r) throws Exception {
throw new IllegalStateException("Failure");
}
});
try {
RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE);
Assert.fail("Should have thrown!");
} catch (IllegalStateException ex) {
Assert.assertEquals("Failure", ex.getMessage());
}
RxSwingPlugins.reset();
Assert.assertSame(Functions.EMPTY_RUNNABLE, RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE));
}
项目:AyoRxJava
文件:Rx_empty.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.empty()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_merge2.java
protected void runOk(){
final String[] list1 = {"1", "2", "3", "4", "5", "6", "7", "8", "9", "0"};
final String[] list2 = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m"};
Flowable<String> flowable1 = Flowable.fromArray(list1);
Flowable<String> flowable2 = Flowable.fromArray(list2);
task = Flowable.merge(flowable1, flowable2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_mergeDelayError.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.empty()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_ofType.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.just(1, "a", 2, "b")
.ofType(String.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy(s.toString());
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_timer.java
protected void runOk(){
/*
- timer
- 用于一次性的延时任务
- 如Flowable.timer(600, TimeUnit.MILLISECONDS)表示600毫秒后激活onNext
- 然后还会激活onComplete
- 具体发的item是什么值,好像只能是0
*/
task = Flowable.timer(600, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
Log.i("repeat", "on complete");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_take.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.interval(1, 1, TimeUnit.SECONDS)
.take(4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_then.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.empty()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_join.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.empty()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_never.java
protected void runOk(){
/*
- never
- 什么也不会发,什么也不会调用
*/
task = Flowable.never()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
notifyy("item--" + s);
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_sample.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
Flowable.interval(0, 1, TimeUnit.SECONDS)
.sample(400, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_takeLast2.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.interval(1, 1, TimeUnit.SECONDS)
.take(10)
.takeLast(4, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_throttleFirst.java
protected void runOk(){
Flowable.interval(0, 1, TimeUnit.SECONDS)
.throttleFirst(3000, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:AyoRxJava
文件:Rx_skip2.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.interval(1, 1, TimeUnit.SECONDS)
.skip(4, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:RxJava2Extensions
文件:NonoTest.java
@Test
public void usingDisposerThrows5() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Nono.using(Functions.justCallable(0),
new Function<Integer, Nono>() {
@Override
public Nono apply(Integer v) throws Exception {
throw new IOException();
}
},
new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
throw new IllegalArgumentException();
}
}, false
)
.test()
.assertFailure(IOException.class);
TestHelper.assertError(errors, 0, IllegalArgumentException.class);
} finally {
RxJavaPlugins.reset();
}
}
项目:RxJava2Jdk8Interop
文件:ObservableInteropTest.java
@Test
public void mapOptionalSyncFusedConditional() {
TestObserver<Integer> ts = TestHelper.fusedObserver(QueueSubscription.ANY);
Observable.range(1, 5)
.compose(ObservableInterop.mapOptional(v -> {
if (v % 2 == 0) {
return Optional.of(-v);
}
return Optional.empty();
}))
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertOf(TestHelper.assertFusedObserver(QueueSubscription.SYNC))
.assertResult(-2, -4);
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void fusedThrowsInPostEmissionCheckErrorDelayed() {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
true,
Flowable.just(1).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new IllegalArgumentException();
}
}),
Flowable.just(2, 3))
.test(0L)
.requestMore(2)
.assertFailure(IllegalArgumentException.class, 2, 3);
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void bothError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
Flowable.<Integer>error(new IOException("first")),
Flowable.<Integer>error(new IOException("second"))
)
.test()
.assertFailureAndMessage(IOException.class, "first");
TestHelper.assertUndeliverable(errors, 0, IOException.class, "second");
} finally {
RxJavaPlugins.reset();
}
}
项目:RxJava2Extensions
文件:FlowableMapFilterConditionalTest.java
@Test
public void consumerCompleteFused() {
TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);
Flowable.range(1, 5)
.compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
@Override
public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
e.doComplete();
}
}))
.filter(Functions.alwaysTrue())
.subscribe(ts);
ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
.assertResult();
}
项目:RxJava2Extensions
文件:FlowableSwitchFlatMapTest.java
@Test
public void innerDoubleError() {
List<Throwable> error = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.compose(FlowableTransformers.switchFlatMap(Functions.justFunction(new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
s.onError(new IOException());
s.onError(new IllegalArgumentException());
}
}), 2))
.test()
.assertFailure(IOException.class);
TestHelper.assertError(error, 0, IllegalArgumentException.class);
} finally {
RxJavaPlugins.reset();
}
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void nonEmptyBothErrorDelayed() {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
true,
Flowable.just(1).concatWith(Flowable.<Integer>error(new IOException("first"))),
Flowable.just(2).concatWith(Flowable.<Integer>error(new IOException("second")))
)
.test()
.assertFailure(CompositeException.class, 1, 2)
.assertOf(new Consumer<TestSubscriber<Integer>>() {
@Override
public void accept(TestSubscriber<Integer> ts) throws Exception {
List<Throwable> list = TestHelper.compositeList(ts.errors().get(0));
TestHelper.assertError(list, 0, IOException.class, "first");
TestHelper.assertError(list, 1, IOException.class, "second");
}
});
}
项目:RxJava2Extensions
文件:NonoTest.java
@Test
public void usingDisposerThrows2() {
Nono.using(Functions.justCallable(0),
Functions.justFunction(ioError),
new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
throw new IllegalArgumentException();
}
}
)
.test()
.assertFailure(CompositeException.class)
.assertOf(new Consumer<TestSubscriber<Void>>() {
@SuppressWarnings("unchecked")
@Override
public void accept(TestSubscriber<Void> ts) throws Exception {
TestHelper.assertCompositeExceptions(ts, IOException.class, IllegalArgumentException.class);
}
});
}
项目:RxJava2Extensions
文件:FlowableMapFilterConditionalTest.java
@Test
public void consumerCompleteCancel() {
BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1);
pp
.compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
@Override
public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
e.doComplete();
}
}))
.filter(Functions.alwaysTrue())
.test()
.assertResult();
assertFalse(pp.hasSubscribers());
}
项目:RxJava2Extensions
文件:FlowableMapFilterConditionalTest.java
@Test
public void mapFused() {
TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);
Flowable.range(1, 5)
.compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
@Override
public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
e.doNext(t * 2);
}
}))
.filter(Functions.alwaysTrue())
.subscribe(ts);
ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
.assertResult(2, 4, 6, 8, 10);
}
项目:RxJava2Extensions
文件:FlowableMapFilterConditionalTest.java
@Test
public void filterFused() {
TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY);
Flowable.range(1, 5)
.compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() {
@Override
public void accept(Integer t, BasicEmitter<Integer> e) throws Exception {
if (t % 2 == 0) {
e.doNext(t * 2);
}
}
}))
.filter(Functions.alwaysTrue())
.subscribe(ts);
ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC))
.assertResult(4, 8);
}
项目:RxAndroidAudio
文件:FileActivity.java
@OnClick(R.id.mBtnPlay)
public void startPlay() {
mTvLog.setText("");
if (!mAudioFiles.isEmpty()) {
File audioFile = mAudioFiles.poll();
mRxAudioPlayer.play(
PlayConfig.file(audioFile)
.streamType(AudioManager.STREAM_VOICE_CALL)
.build())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(Functions.emptyConsumer(), Throwable::printStackTrace,
this::startPlay);
}
}
项目:RxJava2Extensions
文件:SoloTest.java
@Test
public void usingEager() {
Solo.using(Functions.justCallable(1),
Functions.justFunction(Solo.just(1)), this)
.test()
.assertResult(1);
assertEquals(1, count);
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@Test
public void iterableEmpty() {
Flowables.orderedMerge(Collections.<Flowable<Integer>>emptyList(),
Functions.<Integer>naturalComparator()
)
.test()
.assertResult();
}
项目:AyoRxJava
文件:Rx_takeUntil2.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.interval(1, 1, TimeUnit.SECONDS)
.takeUntil(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong >= 5;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:RxJava2Extensions
文件:PerhapsTest.java
@Test
public void fromFutureNull() {
FutureTask<Integer> ft = new FutureTask<Integer>(Functions.EMPTY_RUNNABLE, null);
ft.run();
Perhaps.fromFuture(ft)
.test()
.assertResult();
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void fusedThrowsInDrainLoop() {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
Flowable.just(1).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer v) throws Exception {
throw new IllegalArgumentException();
}
}),
Flowable.just(2, 3))
.test()
.assertFailure(IllegalArgumentException.class);
}
项目:RxJava2Extensions
文件:NonoTest.java
@Test
public void usingNonEager() {
Nono.using(
Functions.justCallable(1),
Functions.justFunction(Nono.complete()),
this, false
)
.test()
.assertResult();
Assert.assertEquals(1, count);
}
项目:AyoRxJava
文件:Rx_skipWhile.java
protected void runOk(){
/*
- empty
- 直接调用complete
*/
task = Flowable.interval(1, 1, TimeUnit.SECONDS)
.skipWhile(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong <= 5 ;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
notifyy(s + "");
}
}, Functions.ERROR_CONSUMER,
new Action() {
@Override
public void run() throws Exception {
notifyy("onComplete---结束了!@@");
}
},
FlowableInternalHelper.RequestMax.INSTANCE);
}
项目:RxJava2Extensions
文件:AsyncObservableTest.java
@Test
public void deferFutureCustomScheduler() {
final FutureTask<Observable<Integer>> ft = new FutureTask<Observable<Integer>>(Functions.EMPTY_RUNNABLE, Observable.just(1));
ft.run();
AsyncObservable.deferFuture(new Callable<Future<Observable<Integer>>>() {
@Override
public Future<Observable<Integer>> call() throws Exception {
return ft;
}
}, Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void secondErrors() {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
Flowable.just(1, 3, 5, 7),
Flowable.<Integer>error(new IOException())
)
.test()
.assertFailure(IOException.class);
}
项目:RxJava2Extensions
文件:NonoTest.java
@Test(timeout = 5000)
public void retryPredicateNoRetry() {
ioError
.retry(Functions.alwaysFalse())
.test()
.assertFailure(IOException.class);
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void nullSecond() {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
Flowable.just(1), null)
.test()
.assertFailure(NullPointerException.class);
}
项目:RxJava2Extensions
文件:FlowableOrderedMergeTest.java
@SuppressWarnings("unchecked")
@Test
public void normal2Hidden() {
Flowables.orderedMerge(Functions.<Integer>naturalComparator(),
Flowable.just(1, 3, 5, 7).hide(), Flowable.just(2, 4, 6, 8).hide())
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8);
}
项目:RxJava2Jdk8Interop
文件:FlowableInteropTest.java
@Test
public void mapOptionalConditional() {
Flowable.range(1, 5).hide()
.compose(FlowableInterop.mapOptional(v -> {
if (v % 2 == 0) {
return Optional.of(-v);
}
return Optional.empty();
}))
.filter(Functions.alwaysTrue())
.test()
.assertResult(-2, -4);
}
项目:RxJava2Extensions
文件:PerhapsTest.java
@Test
public void flatMapPublisherError2() {
Perhaps.just(1)
.flatMapPublisher(Functions.justFunction(Flowable.error(new IOException())))
.test()
.assertFailure(IOException.class);
}