@Test public void testCommand_callback_sync() throws IOException, InterruptedException { processor.attach(session); int cnt = 100; List<Pair<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>(); for (int j = 0; j < cnt; j++) { List<String> cmds = new ArrayList<>(); for (int i = 0; i < 10; i++) cmds.add("echo " + i); cmds.add("echo " + j); PublishProcessor<String> outputListener = PublishProcessor.create(); TestSubscriber<String> outputObserver = outputListener.doOnEach(stringNotification -> TestHelper.sleep(1)).test(); final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build(); final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test(); testSubscribers.add(new Pair<>(resultObserver, outputObserver)); } for (Pair<TestObserver<Cmd.Result>, TestSubscriber<String>> pair : testSubscribers) { pair.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete(); pair.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11); } }
@Test public void testCommand_callback_async() throws IOException, InterruptedException { processor.attach(session); int cnt = 100; List<Pair<TestObserver<Cmd.Result>, TestSubscriber<String>>> testSubscribers = new ArrayList<>(); for (int j = 0; j < cnt; j++) { List<String> cmds = new ArrayList<>(); for (int i = 0; i < 10; i++) cmds.add("echo " + i); cmds.add("echo " + j); PublishProcessor<String> outputListener = PublishProcessor.create(); TestSubscriber<String> outputObserver = outputListener.observeOn(Schedulers.newThread()).doOnEach(stringNotification -> TestHelper.sleep(1)).test(); final Cmd cmd = Cmd.builder(cmds).outputProcessor(outputListener).build(); final TestObserver<Cmd.Result> resultObserver = processor.submit(cmd).subscribeOn(Schedulers.newThread()).test(); testSubscribers.add(new Pair<>(resultObserver, outputObserver)); } for (Pair<TestObserver<Cmd.Result>, TestSubscriber<String>> pair : testSubscribers) { pair.first.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertComplete(); pair.second.awaitDone(5, TimeUnit.SECONDS).assertNoTimeout().assertValueCount(11); } }
@Test public void testBuilder_from() { Cmd orig = Cmd.builder(UUID.randomUUID().toString()) .outputBuffer(false) .errorBuffer(false) .timeout(1337) .outputProcessor(PublishProcessor.create()) .errorProcessor(PublishProcessor.create()) .build(); Cmd copy = Cmd.from(orig).build(); assertEquals(orig.getCommands(), copy.getCommands()); assertEquals(orig.isOutputBufferEnabled(), copy.isOutputBufferEnabled()); assertEquals(orig.isErrorBufferEnabled(), copy.isErrorBufferEnabled()); assertEquals(orig.getTimeout(), copy.getTimeout()); assertEquals(orig.getOutputProcessor(), copy.getOutputProcessor()); assertEquals(orig.getErrorProcessor(), copy.getErrorProcessor()); }
@Test public void testBuild() { final PublishProcessor<String> outputPub = PublishProcessor.create(); final PublishProcessor<String> errorPub = PublishProcessor.create(); Cmd cmd = Cmd.builder("cmd1") .outputBuffer(false) .errorBuffer(false) .timeout(1337) .outputProcessor(outputPub) .errorProcessor(errorPub) .build(); assertThat(cmd.getCommands(), contains("cmd1")); assertThat(cmd.getOutputProcessor(), is(outputPub)); assertThat(cmd.getErrorProcessor(), is(errorPub)); assertThat(cmd.getTimeout(), is(1337L)); assertThat(cmd.isOutputBufferEnabled(), is(false)); assertThat(cmd.isErrorBufferEnabled(), is(false)); }
public Flowable<String> process(Flowable<Byte> observableInput){ PublishProcessor<String> publishProcessor = PublishProcessor.create(); StringBuilder sb = new StringBuilder(); observableInput.subscribe(b->{ if(b==32){ //send out a new word on a space publishProcessor.onNext(sb.toString()); sb.setLength(0); }else{ sb.append((char)b.byteValue()); } }, e->LOG.error("Error in BytesToWordsProcessor [{}]", e), publishProcessor::onComplete ); return publishProcessor; }
@Test public void test() { PublishProcessor<String> subject = PublishProcessor.create(); Flowable<String> source = subject.hide(); TestSubscriber testSubscriber = new TestSubscriber(); CompositeDisposable composite = new CompositeDisposable(); Disposable disposable = source .compose(DisposableAttach.<String>to(composite)) .subscribeWith(testSubscriber); subject.onNext("Foo"); testSubscriber.assertValue("Foo"); assertTrue(composite.size() == 1); composite.dispose(); assertTrue(composite.size() == 0); assertTrue(composite.isDisposed()); assertTrue(disposable.isDisposed()); assertTrue(testSubscriber.isDisposed()); }
public boolean isActive() { PublishProcessor<Optional<T>> next = null; if (_next != null) { next = _next.get(); } if (next != null) { return true; } PublishProcessor<Optional<T>> previous = null; if (_previous != null) { previous = _previous.get(); } return previous != null; }
@Test public void evictCancels() { TestScheduler scheduler = new TestScheduler(); PublishProcessor<Integer> pp = PublishProcessor.create(); final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0L); pp .compose(FlowableTransformers.<Integer>onBackpressureTimeout(10, 1, TimeUnit.SECONDS, scheduler, new Consumer<Integer>() { @Override public void accept(Integer e) throws Exception { evicted.add(e); ts.cancel(); } })) .subscribe(ts); TestHelper.emit(pp, 1, 2, 3, 4, 5); scheduler.advanceTimeBy(1, TimeUnit.MINUTES); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), evicted); }
@Test public void comeAndGo() { PublishProcessor<Integer> pp = PublishProcessor.create(); Flowable<Integer> source = pp .publish() .compose(FlowableTransformers.<Integer>refCount(1)); TestSubscriber<Integer> ts1 = source.test(0); Assert.assertTrue(pp.hasSubscribers()); for (int i = 0; i < 3; i++) { TestSubscriber<Integer> ts2 = source.test(); ts1.cancel(); ts1 = ts2; } ts1.cancel(); Assert.assertFalse(pp.hasSubscribers()); }
@Test public void errorImmediate() { TestScheduler scheduler = new TestScheduler(); PublishProcessor<Integer> pp = PublishProcessor.create(); TestSubscriber<Integer> ts = pp .compose(FlowableTransformers.<Integer>spanout(100, TimeUnit.MILLISECONDS, scheduler)) .test(); pp.onNext(1); pp.onError(new IOException()); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); ts.assertFailure(IOException.class); }
@Test public void errorDelayed() { TestScheduler scheduler = new TestScheduler(); PublishProcessor<Integer> pp = PublishProcessor.create(); TestSubscriber<Integer> ts = pp .compose(FlowableTransformers.<Integer>spanout(100L, TimeUnit.MILLISECONDS, scheduler, true)) .test(); pp.onNext(1); pp.onError(new IOException()); scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); ts.assertFailure(IOException.class, 1); }
@Test public void depthEmitCancelRace() { for (int i = 0; i < 1000; i++) { final PublishProcessor<Integer> pp = PublishProcessor.create(); final TestSubscriber<Integer> ts = Flowable.just(0) .compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST)) .test(1); Runnable r1 = new Runnable() { @Override public void run() { pp.onNext(1); } }; Runnable r2 = new Runnable() { @Override public void run() { ts.cancel(); } }; TestHelper.race(r1, r2, Schedulers.single()); } }
@Test public void depthCompleteCancelRace() { for (int i = 0; i < 1000; i++) { final PublishProcessor<Integer> pp = PublishProcessor.create(); final TestSubscriber<Integer> ts = Flowable.just(0) .compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST)) .test(1); Runnable r1 = new Runnable() { @Override public void run() { pp.onComplete(); } }; Runnable r2 = new Runnable() { @Override public void run() { ts.cancel(); } }; TestHelper.race(r1, r2, Schedulers.single()); } }
@Test public void cancel() { final PublishProcessor<Boolean> pp = PublishProcessor.create(); Flowable.range(1, 5) .compose(FlowableTransformers.filterAsync(new Function<Integer, Publisher<Boolean>>() { @Override public Publisher<Boolean> apply(Integer v) throws Exception { return pp; } }, 16)) .test() .cancel(); Assert.assertFalse(pp.hasSubscribers()); }
@Test public void cancelAfterOneBackpressured() { PublishProcessor<Integer> pp1 = PublishProcessor.create(); PublishProcessor<Integer> pp2 = PublishProcessor.create(); TestSubscriber<String> ts = new TestSubscriber<String>(1) { @Override public void onNext(String t) { super.onNext(t); cancel(); onComplete(); } }; Flowables.zipLatest(pp1, pp2, toString2).subscribe(ts); ts.assertEmpty(); pp1.onNext(1); ts.assertEmpty(); pp2.onNext(2); ts.assertResult("[1, 2]"); }
@Test public void badSource() { List<Throwable> errors = TestHelper.trackPluginErrors(); try { final PublishProcessor<Integer> pp1 = PublishProcessor.create(); final Flowable<Integer> pp2 = new Flowable<Integer>() { @Override protected void subscribeActual(Subscriber<? super Integer> s) { BooleanSubscription bs1 = new BooleanSubscription(); s.onSubscribe(bs1); BooleanSubscription bs2 = new BooleanSubscription(); s.onSubscribe(bs2); Assert.assertFalse(bs1.isCancelled()); Assert.assertTrue(bs2.isCancelled()); } }; Flowables.zipLatest(pp1, pp2, toString2).test(); TestHelper.assertError(errors, 0, ProtocolViolationException.class); } finally { RxJavaPlugins.reset(); } }
@Test public void cancel() { final PublishProcessor<Object> pp = PublishProcessor.create(); Flowable.range(1, 5) .compose(FlowableTransformers.mapAsync(new Function<Integer, Publisher<Object>>() { @Override public Publisher<Object> apply(Integer v) throws Exception { return pp; } })) .test() .cancel(); Assert.assertFalse(pp.hasSubscribers()); }
@Test @SuppressWarnings("unchecked") public void slowPathQueueUse() { final PublishProcessor<Integer> pp = PublishProcessor.create(); TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>() { @Override public void onNext(List<Integer> t) { super.onNext(t); if (t.get(0) == 1) { pp.onNext(100); } } }; pp.compose(FlowableTransformers.coalesce(listSupplier, listAdd)).subscribe(ts); pp.onNext(1); pp.onComplete(); ts.assertResult(Arrays.asList(1), Arrays.asList(100)); }
@Test @SuppressWarnings("unchecked") public void slowPathQueueUseCrash() { final PublishProcessor<Integer> pp = PublishProcessor.create(); TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>() { @Override public void onNext(List<Integer> t) { super.onNext(t); if (t.get(0) == 1) { pp.onNext(100); } } }; pp.compose(FlowableTransformers.coalesce(listSupplier, listAddCrash100)).subscribe(ts); pp.onNext(1); pp.onComplete(); ts.assertFailure(IOException.class, Arrays.asList(1)); }
@Test public void unbound_shouldStillPassValues() { TestSubscriber<Integer> firstSubscriber = new TestSubscriber<>(); TestSubscriber<Integer> secondSubscriber = new TestSubscriber<>(); PublishProcessor<Integer> source = PublishProcessor.create(); //noinspection unchecked Subscriber<Integer>[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; source .parallel(DEFAULT_PARALLELISM) .as(AutoDispose.<Integer>autoDisposable(ScopeProvider.UNBOUND)) .subscribe(subscribers); source.onNext(1); source.onNext(2); firstSubscriber.assertValue(1); secondSubscriber.assertValue(2); firstSubscriber.dispose(); secondSubscriber.dispose(); }
@Test public void autoDispose_withMaybe_normal() { TestSubscriber<Integer> o = new TestSubscriber<>(); PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); Disposable d = source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribeWith(o); o.assertSubscribed(); assertThat(source.hasSubscribers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); o.assertValue(1); source.onNext(2); source.onComplete(); o.assertValues(1, 2); o.assertComplete(); assertThat(d.isDisposed()).isFalse(); // Because it completes normally assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withMaybe_interrupted() { PublishProcessor<Integer> source = PublishProcessor.create(); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(lifecycle)) .test(); o.assertSubscribed(); assertThat(source.hasSubscribers()).isTrue(); assertThat(lifecycle.hasObservers()).isTrue(); source.onNext(1); o.assertValue(1); lifecycle.onSuccess(2); source.onNext(2); // No more events o.assertValue(1); // Unsubscribed assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor<Integer> source = PublishProcessor.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Noop } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.createDefault(0); lifecycle.onNext(1); lifecycle.onNext(2); lifecycle.onNext(3); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor<Integer> source = PublishProcessor.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); o.assertNoValues(); o.assertNoErrors(); }
@Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer<OutsideLifecycleException>() { @Override public void accept(OutsideLifecycleException e) { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); } }); BehaviorSubject<Integer> lifecycle = BehaviorSubject.create(); LifecycleScopeProvider<Integer> provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor<Integer> source = PublishProcessor.create(); TestSubscriber<Integer> o = source .as(AutoDispose.<Integer>autoDisposable(provider)) .test(); o.assertNoValues(); o.assertError(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) { return throwable instanceof IllegalStateException && throwable.getCause() instanceof OutsideLifecycleException; } }); }
@Test public void flowableProcessorToFlowProcessor() { PublishProcessor<Integer> pp = PublishProcessor.create(); Flow.Processor<Integer, Integer> fp = FlowInterop.toFlowProcessor(pp); FlowTestSubscriber<Integer> ts = new FlowTestSubscriber<>(); fp.subscribe(ts); pp.onNext(1); pp.onNext(2); pp.onNext(3); pp.onNext(4); pp.onNext(5); pp.onComplete(); ts.assertResult(1, 2, 3, 4, 5); }
@Test public void flowableProcessorToFlowProcessorError() { PublishProcessor<Integer> pp = PublishProcessor.create(); Flow.Processor<Integer, Integer> fp = FlowInterop.toFlowProcessor(pp); FlowTestSubscriber<Integer> ts = new FlowTestSubscriber<>(); fp.subscribe(ts); pp.onNext(1); pp.onNext(2); pp.onNext(3); pp.onNext(4); pp.onNext(5); pp.onError(new IOException()); ts.assertFailure(IOException.class, 1, 2, 3, 4, 5); }
@Test public void initialValueToNewSubscriberAfterUnsubscribe() { PublishProcessor<String> subject = PublishProcessor.create(); Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance()); TestSubscriber<String> subscriber1 = new TestSubscriber<>(); flowable.subscribe(subscriber1); subscriber1.assertNoValues(); subject.onNext("Foo"); subscriber1.assertValues("Foo"); subscriber1.dispose(); TestSubscriber<String> subscriber2 = new TestSubscriber<>(); flowable.subscribe(subscriber2); subscriber2.assertValues("Foo"); }
@Test public void valueMissedWhenNoSubscribers() { PublishProcessor<String> subject = PublishProcessor.create(); Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance()); TestSubscriber<String> subscriber1 = new TestSubscriber<>(); flowable.subscribe(subscriber1); subscriber1.assertNoValues(); subscriber1.dispose(); subject.onNext("Foo"); subscriber1.assertNoValues(); TestSubscriber<String> subscriber2 = new TestSubscriber<>(); flowable.subscribe(subscriber2); subscriber2.assertNoValues(); }
@SuppressWarnings("CheckReturnValue") @Test public void fatalExceptionDuringReplayThrown() { PublishProcessor<String> subject = PublishProcessor.create(); Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance()); flowable.subscribe(); subject.onNext("Foo"); Consumer<String> brokenAction = new Consumer<String>() { @Override public void accept(String s) { throw new OutOfMemoryError("broken!"); } }; try { flowable.subscribe(brokenAction); } catch (OutOfMemoryError e) { assertEquals("broken!", e.getMessage()); } }
@Test public void backpressureHonoredWhenCached() { PublishProcessor<String> subject = PublishProcessor.create(); Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance()); TestSubscriber<String> subscriber1 = new TestSubscriber<>(); flowable.subscribe(subscriber1); subscriber1.assertNoValues(); subject.onNext("Foo"); subscriber1.assertValues("Foo"); TestSubscriber<String> subscriber2 = new TestSubscriber<>(0); flowable.subscribe(subscriber2); subscriber2.assertNoValues(); subject.onNext("Bar"); // Replace the cached value... subscriber2.request(1); // ...and ensure new requests see it. subscriber2.assertValues("Bar"); }
@Test public void streamsDoNotShareInstances() { PublishProcessor<String> subjectA = PublishProcessor.create(); Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance()); TestSubscriber<String> subscriberA1 = new TestSubscriber<>(); flowableA.subscribe(subscriberA1); PublishProcessor<String> subjectB = PublishProcessor.create(); Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance()); TestSubscriber<String> subscriberB1 = new TestSubscriber<>(); flowableB.subscribe(subscriberB1); subjectA.onNext("Foo"); subscriberA1.assertValues("Foo"); subjectB.onNext("Bar"); subscriberB1.assertValues("Bar"); TestSubscriber<String> subscriberA2 = new TestSubscriber<>(); flowableA.subscribe(subscriberA2); subscriberA2.assertValues("Foo"); TestSubscriber<String> subscriberB2 = new TestSubscriber<>(); flowableB.subscribe(subscriberB2); subscriberB2.assertValues("Bar"); }
@Override public View onCreateView( LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) { View layout = inflater.inflate(R.layout.fragment_double_binding_textview, container, false); unbinder = ButterKnife.bind(this, layout); _resultEmitterSubject = PublishProcessor.create(); _disposable = _resultEmitterSubject.subscribe( aFloat -> { _result.setText(String.valueOf(aFloat)); }); onNumberChanged(); _number2.requestFocus(); return layout; }
@Override public void onStart() { super.onStart(); publishProcessor = PublishProcessor.create(); disposable = publishProcessor .startWith(getConnectivityStatus(getActivity())) .distinctUntilChanged() .observeOn(AndroidSchedulers.mainThread()) .subscribe( online -> { if (online) { log("You are online"); } else { log("You are offline"); } }); listenToNetworkConnectivity(); }
@Test public void testUpstreamTerminated_output() { publisher.onComplete(); OutputHarvester.Crop crop = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0); assertThat(crop.isComplete, is(false)); publisher = PublishProcessor.create(); publisher.onError(new InterruptedException()); crop = publisher.compose(harvesterFactory.forOutput(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0); assertThat(crop.isComplete, is(false)); }
@Test public void testUpstreamTerminated_error() { publisher.onComplete(); ErrorHarvester.Crop crop = publisher.compose(harvesterFactory.forError(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0); assertThat(crop.isComplete, is(false)); publisher = PublishProcessor.create(); publisher.onError(new InterruptedException()); crop = publisher.compose(harvesterFactory.forError(publisher, cmd)).test().assertComplete().assertValueCount(1).values().get(0); assertThat(crop.isComplete, is(false)); }
synchronized Processor getProcessor(RxQueueKey key, boolean createIfMissing) { // 1) look if key already has a publisher processor, if so, return it if (mProcessorKeys.containsKey(key)) return mProcessorKeys.get(key); // 2) else, create a new one and put it into the map else if (createIfMissing) { Processor processor = PublishProcessor.create().toSerialized(); mProcessorKeys.put(key, processor); return processor; } else return null; }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); ButterKnife.bind(this); resultPublisher = PublishProcessor.create(); subscriber = resultPublisher.subscribe(aFloat -> { tvSum.setText("Sum = " + aFloat); }); onNumberChanged(); }
public <T> Flowable<T> register(@NonNull Object tag) { List<FlowableProcessor> processors = mProcessorMapper.get(tag); if (null == processors) { processors = new ArrayList<FlowableProcessor>(); mProcessorMapper.put(tag, processors); } FlowableProcessor<T> processor; processors.add(processor = PublishProcessor.create()); return processor; }
private IndexHolder(int index, List<T> internalList, WeakReference<PublishProcessor<Optional<T>>> previous, WeakReference<PublishProcessor<Optional<T>>> next) { _index = index; _internalList = internalList; _previous = previous; _next = next; }