public static void main(String[] args) { Subject<String> subject = AsyncSubject.create(); subject.subscribe(s -> System.out.println("Observer 1: " + s), Throwable::printStackTrace, () -> System.out.println("Observer 1 done!") ); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); subject.subscribe(s -> System.out.println("Observer 2: " + s), Throwable::printStackTrace, () -> System.out.println("Observer 2 done!") ); }
public static void main(String[] args) { Subject<String> subject = UnicastSubject.create(); Observable.interval(300, TimeUnit.MILLISECONDS) .map(l -> ((l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); //multicast to support multiple Observers Observable<String> multicast = subject.publish().autoConnect(); //bring in first Observer multicast.subscribe(s -> System.out.println("Observer 1: " + s)); sleep(2000); //bring in second Observer multicast.subscribe(s -> System.out.println("Observer 2: " + s)); sleep(1000); }
/** * 取消监听 * * @param tag * @param observable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Observable<?> observable) { if (null == observable) return getInstance(); List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove( observable); if (isEmpty(subjects)) { subjectMapper.remove(tag); LogUtils.debugInfo("unregister"+ tag + " size:" + subjects.size()); } } return getInstance(); }
/** * 取消订阅 * * @param tag 标志 * @return true 取消成功 */ public boolean unregister(@NonNull Object tag) { RxBusEvent rxBusEvent = rxBusEventArrayMap.get(tag); if (RxUtils.isEmpty(rxBusEvent)) { return true; } Subject<Object> subject = rxBusEvent.subject; Disposable disposable = rxBusEvent.disposable; if (!disposable.isDisposed()) { disposable.dispose(); } if (!subject.hasObservers()) { rxBusEventArrayMap.remove(tag); return true; } return false; }
public ClusterMigrationListener() { // init a publish subject, to allow emitting migration events Subject<MigrationEvent> subj = PublishSubject.create(); migrationEvent = subj.toSerialized(); // collect migration events every 10 seconds and remove any redundant tasks on every node migrationEvent.buffer(10, TimeUnit.SECONDS) // filter our windows when no events have been observed .filter(events -> !events.isEmpty()) // log partition migration event .doOnNext(events -> logger.info("[CLUSTER] Migrated {} partition", events.size())) // and update tasks .doOnNext(events -> updateTasksAfterClusterEvent()) // process async .subscribeOn(Schedulers.computation()) .subscribe(); }
@Test public void testPersistStdoutReturnsADisposableThatStopsFurtherReads() { final JobDAO dao = getInstance(); final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId(); final Subject<byte[]> stdoutSubject = PublishSubject.create(); final AtomicBoolean stdoutObsWasRead = new AtomicBoolean(false); final Observable<byte[]> stdoutObs = stdoutSubject.map(data -> { stdoutObsWasRead.set(true); return data; }); final Disposable disposable = dao.appendStdout(jobId, stdoutObs); disposable.dispose(); stdoutSubject.onNext(TestHelpers.generateRandomBytes()); assertThat(stdoutObsWasRead.get()); }
@Test public void testPersistStderrReturnsADisposableThatStopsFurtherReads() { final JobDAO dao = getInstance(); final JobId jobId = dao.persist(STANDARD_VALID_REQUEST).getId(); final Subject<byte[]> stderrSubject = PublishSubject.create(); final AtomicBoolean stderrObsWasRead = new AtomicBoolean(false); final Observable<byte[]> stderrObs = stderrSubject.map(data -> { stderrObsWasRead.set(true); return data; }); final Disposable disposable = dao.appendStderr(jobId, stderrObs); disposable.dispose(); stderrSubject.onNext(TestHelpers.generateRandomBytes()); assertThat(stderrObsWasRead.get()); }
@Test public void testExecuteStdoutListenerIsCalledWithCompletedOnceApplicationExecutionEnds() throws Throwable { final JobExecutor jobExecutor = getInstance(); final AtomicBoolean completedCalled = new AtomicBoolean(false); final Subject<byte[]> stdoutSubject = PublishSubject.create(); stdoutSubject.doOnComplete(() -> completedCalled.set(true)).subscribe(); final JobEventListeners listeners = createStdoutListener(stdoutSubject); final CancelablePromise<JobExecutionResult> ret = jobExecutor.execute(STANDARD_REQUEST, listeners); promiseAssert(ret, result -> { try { // The stdout thread can race with the exit thread Thread.sleep(50); assertThat(completedCalled.get()).isTrue(); } catch (InterruptedException ignored) {} }); }
@Test public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable { final JobExecutor jobExecutor = getInstance(); final AtomicBoolean completedCalled = new AtomicBoolean(false); final Subject<byte[]> stderrSubject = PublishSubject.create(); stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe(); final JobEventListeners listeners = createStderrListener(stderrSubject); final CancelablePromise<JobExecutionResult> ret = jobExecutor.execute(STANDARD_REQUEST, listeners); promiseAssert(ret, result -> { try { // The stderr thread can race with the exit thread Thread.sleep(50); assertThat(completedCalled.get()).isTrue(); } catch (InterruptedException ignored) {} }); }
@Test public void testExecuteEvaluatesJobInputsAsExpected() throws InterruptedException { final JobExecutor jobExecutor = getInstance(); final PersistedJob req = standardRequestWithCommand("echo", "${inputs.foo}"); final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{}); final Subject<byte[]> stdoutSubject = PublishSubject.create(); stdoutSubject.subscribe(bytes -> bytesEchoedToStdout.getAndUpdate(existingBytes -> Bytes.concat(existingBytes, bytes))); final Semaphore s = new Semaphore(1); s.acquire(); stdoutSubject.doOnComplete(s::release).subscribe(); final JobEventListeners listeners = createStdoutListener(stdoutSubject); jobExecutor.execute(req, listeners); s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS); final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim(); assertThat(stringFromStdout).isEqualTo("a"); // from spec }
@Test public void testGetStdoutUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException { final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>(); final Subject<byte[]> stdoutSubject = PublishSubject.create(); final JobExecutor executor = MockJobExecutor.thatUses(executorPromise, stdoutSubject, Observable.just(TestHelpers.generateRandomBytes())); final JobManager jobManager = createManagerWith(executor); final Pair<JobId, CancelablePromise<FinalizedJob>> ret = jobManager.submit(STANDARD_VALID_REQUEST); final Observable<byte[]> stdoutObservable = jobManager.stdoutUpdates(ret.getLeft()).get(); final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>(); stdoutObservable.subscribe(bytesFromObservable::set); final byte[] bytesExpected = TestHelpers.generateRandomBytes(); stdoutSubject.onNext(bytesExpected); executorPromise.complete(new JobExecutionResult(FINISHED)); ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS); assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected); }
@Test public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException { final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>(); final Subject<byte[]> stderrSubject = PublishSubject.create(); final JobExecutor executor = MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject); final JobManager jobManager = createManagerWith(executor); final Pair<JobId, CancelablePromise<FinalizedJob>> ret = jobManager.submit(STANDARD_VALID_REQUEST); final Observable<byte[]> stderrObservable = jobManager.stderrUpdates(ret.getLeft()).get(); final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>(); stderrObservable.subscribe(bytesFromObservable::set); final byte[] bytesExpected = TestHelpers.generateRandomBytes(); stderrSubject.onNext(bytesExpected); executorPromise.complete(new JobExecutionResult(FINISHED)); ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS); assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected); }
private static void demo2() { Subject<Long> subject = PublishSubject.create(); Observable.interval(2, TimeUnit.SECONDS) .take(3) .doOnComplete(() -> log("Origin-One-doOnComplete")) .subscribe(subject); Observable.interval(1, TimeUnit.SECONDS) .take(2) .doOnComplete(() -> log("Origin-Two-doOnComplete")) .subscribe(subject); subject .doOnComplete(() -> log("First-doOnComplete")) .subscribe(v -> log(v)); }
private static void demo1() throws InterruptedException { Subject<Long> subject = PublishSubject.create(); Observable.interval(2, TimeUnit.SECONDS) .take(5) .doOnSubscribe((d) -> log("Original-doOnSubscribe")) .doOnComplete(() -> log("Original-doOnComplete")) .subscribe(subject); subject .doOnSubscribe((d) -> log("First-doOnSubscribe")) .doOnComplete(() -> log("First-doOnComplete")) .subscribe(v -> log("First: " + v)); Thread.sleep(4100); subject .doOnSubscribe((d) -> log("Second-doOnSubscribe")) .doOnComplete(() -> log("Second-doOnComplete")) .subscribe(v -> log("Second: " + v)); }
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) { // this complexity exists because the access token must be revoked AFTER the refresh token // why? because the access token is needed for both revocations! Subject<JsonElement> responses = PublishSubject.create(); RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken( token.getRefreshToken(), clientSecret); revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses) .doOnComplete(() -> { RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken( token.getAccessToken(), clientSecret); revokeSingleToken(token.getAuthHeader(), accessReqBody, responses) .subscribe(); }) .subscribe(); return responses; }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); viewModel = new MainViewModel(this); final ActivityMainBinding binding = DataBindingUtil.setContentView(this, R.layout.activity_main); // You can bind trigger observable instead of using "rxCommandOnClick" on layout xml. final Button goToTodoButton = binding.buttonGoToTodo; final Subject<NoParameter> emitter = PublishSubject.create(); goToTodoButton.setOnClickListener(view -> emitter.onNext(NoParameter.INSTANCE)); viewModel.goToTodoCommand.bindTrigger(emitter); viewModel.goToTodoCommand.setCancellable(() -> goToTodoButton.setOnClickListener(null)); binding.setViewModel(viewModel); }
@Test public void emitValueWhenBoundTriggerEmitsValue() { // given Subject<NoParameter> trigger = PublishSubject.create(); RxCommand<NoParameter> command = new RxCommand<NoParameter>() .bindTrigger(trigger); TestObserver<NoParameter> testObserver = command.test(); // when trigger.onNext(NoParameter.INSTANCE); // then testObserver.assertSubscribed() .assertValue(NoParameter.INSTANCE) .assertNoErrors() .assertNotComplete() .dispose(); // after command.dispose(); }
@Test public void throwsErrorWhenBoundTriggerEmitsError() { // given Subject<NoParameter> trigger = PublishSubject.create(); RxCommand<NoParameter> command = new RxCommand<NoParameter>() .bindTrigger(trigger); TestObserver<NoParameter> testObserver = command.test(); // when trigger.onError(new RuntimeException("Error in the trigger observable")); // then testObserver.assertFailureAndMessage( RuntimeException.class, "Error in the trigger observable") .dispose(); // after command.dispose(); }
@Test public void emitsOnCompleteWhenBoundTriggerIsCompleted() { // given Subject<NoParameter> trigger = PublishSubject.create(); RxCommand<NoParameter> command = new RxCommand<NoParameter>() .bindTrigger(trigger); TestObserver<NoParameter> testObserver = command.test(); // when trigger.onNext(NoParameter.INSTANCE); trigger.onComplete(); // then testObserver.assertResult(NoParameter.INSTANCE) .dispose(); // after command.dispose(); }
@Test public void triggerBindingCanExecuteMoreThanOnce() { // given Subject<NoParameter> firstTrigger = PublishSubject.create(); Subject<NoParameter> secondTrigger = PublishSubject.create(); RxCommand<NoParameter> command = new RxCommand<>(); TestObserver<NoParameter> testObserver = command.test(); // when command.bindTrigger(firstTrigger); firstTrigger.onNext(NoParameter.INSTANCE); command.bindTrigger(secondTrigger); firstTrigger.onNext(NoParameter.INSTANCE); secondTrigger.onNext(NoParameter.INSTANCE); // then testObserver.assertSubscribed() .assertValues(NoParameter.INSTANCE, NoParameter.INSTANCE) .assertNoErrors() .assertNotComplete() .dispose(); // after command.dispose(); }
@Test public void followsSourceObservable() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = property.test(); // when source.onNext("First"); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValues("First", "Second") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void doesNotEmitSameValuesWhenSourceObservableEmitsSameValuesWithDistinctUntilChangeMode() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = property.test(); // when source.onNext("RxProperty"); source.onNext("RxProperty"); source.onNext("RxProperty"); // then testObserver.assertSubscribed() .assertValue("RxProperty") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void emitsAllValuesWhenSourceObservableEmitsSameValuesWithoutDistinctUntilChangeMode() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source, EnumSet.of(RxProperty.Mode.NONE)); TestObserver<String> testObserver = property.test(); // when source.onNext("RxProperty"); source.onNext("RxProperty"); source.onNext("RxProperty"); // then testObserver.assertSubscribed() .assertValues("RxProperty", "RxProperty", "RxProperty") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void followsSourceObservable() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = propertyObserver(property); // when source.onNext("First"); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValues("First", "Second") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void distinctUntilChangeWhenSourceObservableEmitsSameValuesWithDistinctUntilChangeMode() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = propertyObserver(property); // when source.onNext("RxProperty"); source.onNext("RxProperty"); source.onNext("RxProperty"); // then testObserver.assertSubscribed() .assertValue("RxProperty") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void notifyAllValuesWhenSourceObservableEmitsSameValuesWithoutDistinctUntilChangeMode() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source, EnumSet.of(RxProperty.Mode.NONE)); TestObserver<String> testObserver = propertyObserver(property); // when source.onNext("RxProperty"); source.onNext("RxProperty"); source.onNext("RxProperty"); // then testObserver.assertSubscribed() .assertValues("RxProperty", "RxProperty", "RxProperty") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void noLongerNotifyWhenSourceObservableEmitsError() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = propertyObserver(property); // when source.onNext("First"); source.onError(new RuntimeException("Error in source observable")); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValue("First") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void noLongerNotifyWhenSourceObservableIsCompleted() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = propertyObserver(property); // when source.onNext("First"); source.onComplete(); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValue("First") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void noLongerNotifyWhenSourceObservableEmitsValueAfterDisposed() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = propertyObserver(property); // when source.onNext("First"); property.dispose(); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValue("First") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void followsSourceObservable() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = valueFieldObserver(property); // when source.onNext("First"); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValues("First", "Second") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void distinctUntilChangeWhenSourceObservableEmitsSameValuesWithDistinctUntilChangeMode() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = valueFieldObserver(property); // when source.onNext("RxProperty"); source.onNext("RxProperty"); source.onNext("RxProperty"); // then testObserver.assertSubscribed() .assertValue("RxProperty") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void notifyAllValuesWhenSourceObservableEmitsSameValuesWithoutDistinctUntilChangeMode() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source, EnumSet.of(RxProperty.Mode.NONE)); TestObserver<String> testObserver = valueFieldObserver(property); // when source.onNext("RxProperty"); source.onNext("RxProperty"); source.onNext("RxProperty"); // then testObserver.assertSubscribed() .assertValues("RxProperty", "RxProperty", "RxProperty") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void noLongerNotifyWhenSourceObservableEmitsError() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = valueFieldObserver(property); // when source.onNext("First"); source.onError(new RuntimeException("Error in source observable")); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValue("First") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void noLongerNotifyWhenSourceObservableIsCompleted() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = valueFieldObserver(property); // when source.onNext("First"); source.onComplete(); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValue("First") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void noLongerNotifyWhenSourceObservableEmitsValueAfterDisposed() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); TestObserver<String> testObserver = valueFieldObserver(property); // when source.onNext("First"); property.dispose(); source.onNext("Second"); // then testObserver.assertSubscribed() .assertValue("First") .assertNoErrors() .assertNotComplete() .dispose(); }
@Test public void errorObservablesEmitOnCompleteWhenSourceObservableIsCompleted() { // given Subject<String> source = PublishSubject.create(); property = new RxProperty<>(source); property.setValidator(new AllSuccessValidator()); testObserver = new RxPropertyErrorObserver<>(property); // when source.onComplete(); // then testObserver.assertNoErrors() .assertNoSummarizedErrors() .assertNoHasErrors() .assertComplete() .dispose(); }
/** * Turns this query into an Observable. Each subscription will trigger the underlying database operation. * <p/> * This is a low-level API to directly work with the JDBC ResultSet. * * @return the Observable */ public Observable<ResultSetWithColumns> rxResultSet() { return Subject.create(emitter -> { try { ResultSetWithColumns resultSetWithColumns = createResultSetWithColumns(); checkConformity(resultSetWithColumns.getConverters()); while (resultSetWithColumns.getResultSet().next() && !emitter.isDisposed()) { emitter.onNext(resultSetWithColumns); } resultSetWithColumns.getResultSet().close(); emitter.onComplete(); } catch (SQLException e) { emitter.onError(e); } }); }
public static void main(String[] args) { Subject<String> subject = PublishSubject.create(); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); subject.map(String::length) .subscribe(System.out::println); }
public static void main(String[] args) { Subject<String> subject = UnicastSubject.create(); Observable.interval(300, TimeUnit.MILLISECONDS) .map(l -> ((l + 1) * 300) + " milliseconds") .subscribe(subject); sleep(2000); subject.subscribe(s -> System.out.println("Observer 1: " + s)); sleep(2000); }