@Test public void onScheduleCrashes() { RxSwingPlugins.setOnSchedule(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable r) throws Exception { throw new IllegalStateException("Failure"); } }); try { RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE); Assert.fail("Should have thrown!"); } catch (IllegalStateException ex) { Assert.assertEquals("Failure", ex.getMessage()); } RxSwingPlugins.reset(); Assert.assertSame(Functions.EMPTY_RUNNABLE, RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE)); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.empty() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { notifyy("item--" + s); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ final String[] list1 = {"1", "2", "3", "4", "5", "6", "7", "8", "9", "0"}; final String[] list2 = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m"}; Flowable<String> flowable1 = Flowable.fromArray(list1); Flowable<String> flowable2 = Flowable.fromArray(list2); task = Flowable.merge(flowable1, flowable2) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { notifyy("item--" + s); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.just(1, "a", 2, "b") .ofType(String.class) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { notifyy(s.toString()); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - timer - 用于一次性的延时任务 - 如Flowable.timer(600, TimeUnit.MILLISECONDS)表示600毫秒后激活onNext - 然后还会激活onComplete - 具体发的item是什么值,好像只能是0 */ task = Flowable.timer(600, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy("item--" + s); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { Log.i("repeat", "on complete"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.interval(1, 1, TimeUnit.SECONDS) .take(4) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - never - 什么也不会发,什么也不会调用 */ task = Flowable.never() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { notifyy("item--" + s); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - empty - 直接调用complete */ Flowable.interval(0, 1, TimeUnit.SECONDS) .sample(400, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.interval(1, 1, TimeUnit.SECONDS) .take(10) .takeLast(4, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ Flowable.interval(0, 1, TimeUnit.SECONDS) .throttleFirst(3000, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.interval(1, 1, TimeUnit.SECONDS) .skip(4, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
@Test public void usingDisposerThrows5() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { Nono.using(Functions.justCallable(0), new Function<Integer, Nono>() { @Override public Nono apply(Integer v) throws Exception { throw new IOException(); } }, new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { throw new IllegalArgumentException(); } }, false ) .test() .assertFailure(IOException.class); TestHelper.assertError(errors, 0, IllegalArgumentException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void mapOptionalSyncFusedConditional() { TestObserver<Integer> ts = TestHelper.fusedObserver(QueueSubscription.ANY); Observable.range(1, 5) .compose(ObservableInterop.mapOptional(v -> { if (v % 2 == 0) { return Optional.of(-v); } return Optional.empty(); })) .filter(Functions.alwaysTrue()) .subscribeWith(ts) .assertOf(TestHelper.assertFusedObserver(QueueSubscription.SYNC)) .assertResult(-2, -4); }
@SuppressWarnings("unchecked") @Test public void fusedThrowsInPostEmissionCheckErrorDelayed() { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), true, Flowable.just(1).map(new Function<Integer, Integer>() { @Override public Integer apply(Integer v) throws Exception { throw new IllegalArgumentException(); } }), Flowable.just(2, 3)) .test(0L) .requestMore(2) .assertFailure(IllegalArgumentException.class, 2, 3); }
@SuppressWarnings("unchecked") @Test public void bothError() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), Flowable.<Integer>error(new IOException("first")), Flowable.<Integer>error(new IOException("second")) ) .test() .assertFailureAndMessage(IOException.class, "first"); TestHelper.assertUndeliverable(errors, 0, IOException.class, "second"); } finally { RxJavaPlugins.reset(); } }
@Test public void consumerCompleteFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(); }
@Test public void innerDoubleError() { List<Throwable> error = TestHelper.trackPluginErrors(); try { Flowable.just(1) .compose(FlowableTransformers.switchFlatMap(Functions.justFunction(new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { s.onSubscribe(new BooleanSubscription()); s.onError(new IOException()); s.onError(new IllegalArgumentException()); } }), 2)) .test() .assertFailure(IOException.class); TestHelper.assertError(error, 0, IllegalArgumentException.class); } finally { RxJavaPlugins.reset(); } }
@SuppressWarnings("unchecked") @Test public void nonEmptyBothErrorDelayed() { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), true, Flowable.just(1).concatWith(Flowable.<Integer>error(new IOException("first"))), Flowable.just(2).concatWith(Flowable.<Integer>error(new IOException("second"))) ) .test() .assertFailure(CompositeException.class, 1, 2) .assertOf(new Consumer<TestSubscriber<Integer>>() { @Override public void accept(TestSubscriber<Integer> ts) throws Exception { List<Throwable> list = TestHelper.compositeList(ts.errors().get(0)); TestHelper.assertError(list, 0, IOException.class, "first"); TestHelper.assertError(list, 1, IOException.class, "second"); } }); }
@Test public void usingDisposerThrows2() { Nono.using(Functions.justCallable(0), Functions.justFunction(ioError), new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { throw new IllegalArgumentException(); } } ) .test() .assertFailure(CompositeException.class) .assertOf(new Consumer<TestSubscriber<Void>>() { @SuppressWarnings("unchecked") @Override public void accept(TestSubscriber<Void> ts) throws Exception { TestHelper.assertCompositeExceptions(ts, IOException.class, IllegalArgumentException.class); } }); }
@Test public void consumerCompleteCancel() { BehaviorProcessor<Integer> pp = BehaviorProcessor.createDefault(1); pp .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doComplete(); } })) .filter(Functions.alwaysTrue()) .test() .assertResult(); assertFalse(pp.hasSubscribers()); }
@Test public void mapFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { e.doNext(t * 2); } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(2, 4, 6, 8, 10); }
@Test public void filterFused() { TestSubscriber<Integer> ts = TestHelper.fusedSubscriber(QueueSubscription.ANY); Flowable.range(1, 5) .compose(FlowableTransformers.mapFilter(new BiConsumer<Integer, BasicEmitter<Integer>>() { @Override public void accept(Integer t, BasicEmitter<Integer> e) throws Exception { if (t % 2 == 0) { e.doNext(t * 2); } } })) .filter(Functions.alwaysTrue()) .subscribe(ts); ts.assertOf(TestHelper.<Integer>assertFusedSubscriber(QueueSubscription.SYNC)) .assertResult(4, 8); }
@OnClick(R.id.mBtnPlay) public void startPlay() { mTvLog.setText(""); if (!mAudioFiles.isEmpty()) { File audioFile = mAudioFiles.poll(); mRxAudioPlayer.play( PlayConfig.file(audioFile) .streamType(AudioManager.STREAM_VOICE_CALL) .build()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(Functions.emptyConsumer(), Throwable::printStackTrace, this::startPlay); } }
@Test public void usingEager() { Solo.using(Functions.justCallable(1), Functions.justFunction(Solo.just(1)), this) .test() .assertResult(1); assertEquals(1, count); }
@Test public void iterableEmpty() { Flowables.orderedMerge(Collections.<Flowable<Integer>>emptyList(), Functions.<Integer>naturalComparator() ) .test() .assertResult(); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.interval(1, 1, TimeUnit.SECONDS) .takeUntil(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { return aLong >= 5; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
@Test public void fromFutureNull() { FutureTask<Integer> ft = new FutureTask<Integer>(Functions.EMPTY_RUNNABLE, null); ft.run(); Perhaps.fromFuture(ft) .test() .assertResult(); }
@SuppressWarnings("unchecked") @Test public void fusedThrowsInDrainLoop() { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), Flowable.just(1).map(new Function<Integer, Integer>() { @Override public Integer apply(Integer v) throws Exception { throw new IllegalArgumentException(); } }), Flowable.just(2, 3)) .test() .assertFailure(IllegalArgumentException.class); }
@Test public void usingNonEager() { Nono.using( Functions.justCallable(1), Functions.justFunction(Nono.complete()), this, false ) .test() .assertResult(); Assert.assertEquals(1, count); }
protected void runOk(){ /* - empty - 直接调用complete */ task = Flowable.interval(1, 1, TimeUnit.SECONDS) .skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { return aLong <= 5 ; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long s) throws Exception { notifyy(s + ""); } }, Functions.ERROR_CONSUMER, new Action() { @Override public void run() throws Exception { notifyy("onComplete---结束了!@@"); } }, FlowableInternalHelper.RequestMax.INSTANCE); }
@Test public void deferFutureCustomScheduler() { final FutureTask<Observable<Integer>> ft = new FutureTask<Observable<Integer>>(Functions.EMPTY_RUNNABLE, Observable.just(1)); ft.run(); AsyncObservable.deferFuture(new Callable<Future<Observable<Integer>>>() { @Override public Future<Observable<Integer>> call() throws Exception { return ft; } }, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); }
@SuppressWarnings("unchecked") @Test public void secondErrors() { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), Flowable.just(1, 3, 5, 7), Flowable.<Integer>error(new IOException()) ) .test() .assertFailure(IOException.class); }
@Test(timeout = 5000) public void retryPredicateNoRetry() { ioError .retry(Functions.alwaysFalse()) .test() .assertFailure(IOException.class); }
@SuppressWarnings("unchecked") @Test public void nullSecond() { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), Flowable.just(1), null) .test() .assertFailure(NullPointerException.class); }
@SuppressWarnings("unchecked") @Test public void normal2Hidden() { Flowables.orderedMerge(Functions.<Integer>naturalComparator(), Flowable.just(1, 3, 5, 7).hide(), Flowable.just(2, 4, 6, 8).hide()) .test() .assertResult(1, 2, 3, 4, 5, 6, 7, 8); }
@Test public void mapOptionalConditional() { Flowable.range(1, 5).hide() .compose(FlowableInterop.mapOptional(v -> { if (v % 2 == 0) { return Optional.of(-v); } return Optional.empty(); })) .filter(Functions.alwaysTrue()) .test() .assertResult(-2, -4); }
@Test public void flatMapPublisherError2() { Perhaps.just(1) .flatMapPublisher(Functions.justFunction(Flowable.error(new IOException()))) .test() .assertFailure(IOException.class); }