public static void main(String[] args) { Subject<String> subject = UnicastSubject.create(); Observable.interval(300, TimeUnit.MILLISECONDS) .map(l -> ((l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); //multicast to support multiple Observers Observable<String> multicast = subject.publish().autoConnect(); //bring in first Observer multicast.subscribe(s -> System.out.println("Observer 1: " + s)); sleep(2000); //bring in second Observer multicast.subscribe(s -> System.out.println("Observer 2: " + s)); sleep(1000); }
@Test public void toStreamCancel() { UnicastSubject<Integer> up = UnicastSubject.create(); up.onNext(1); up.onNext(2); up.onNext(3); up.onNext(4); up.onNext(5); try (Stream<Integer> s = up .to(ObservableInterop.toStream()).limit(3)) { Assert.assertTrue(up.hasObservers()); List<Integer> list = s.collect(Collectors.toList()); Assert.assertEquals(Arrays.asList(1, 2, 3), list); } Assert.assertFalse(up.hasObservers()); }
@Test public void mapOptionalAsyncFused() { TestObserver<Integer> ts = TestHelper.fusedObserver(QueueSubscription.ANY); UnicastSubject<Integer> up = UnicastSubject.create(); TestHelper.emit(up, 1, 2, 3, 4, 5); up .compose(ObservableInterop.mapOptional(v -> { if (v % 2 == 0) { return Optional.of(-v); } return Optional.empty(); })) .subscribeWith(ts) .assertOf(TestHelper.assertFusedObserver(QueueSubscription.ASYNC)) .assertResult(-2, -4); }
@Test public void mapOptionalAsyncFusedConditional() { TestObserver<Integer> ts = TestHelper.fusedObserver(QueueSubscription.ANY); UnicastSubject<Integer> up = UnicastSubject.create(); TestHelper.emit(up, 1, 2, 3, 4, 5); up .compose(ObservableInterop.mapOptional(v -> { if (v % 2 == 0) { return Optional.of(-v); } return Optional.empty(); })) .filter(Functions.alwaysTrue()) .subscribeWith(ts) .assertOf(TestHelper.assertFusedObserver(QueueSubscription.ASYNC)) .assertResult(-2, -4); }
@Test public void runAsyncProcessor() { AsyncObservable.runAsync(Schedulers.single(), UnicastSubject.<Object>create(), new BiConsumer<Observer<Object>, Disposable>() { @Override public void accept(Observer<? super Object> s, Disposable d) throws Exception { s.onNext(1); s.onNext(2); s.onNext(3); Thread.sleep(200); s.onNext(4); s.onNext(5); s.onComplete(); } }) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(1, 2, 3, 4, 5); }
public static void main(String[] args) { Subject<String> subject = UnicastSubject.create(); Observable.interval(300, TimeUnit.MILLISECONDS) .map(l -> ((l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); subject.subscribe(s -> System.out.println("Observer 1: " + s)); sleep(2000); }