private static ReplaySubject<Faker> createFaker() { final ReplaySubject<Faker> subject = ReplaySubject.create(); Observable.create(new ObservableOnSubscribe<Faker>() { @Override public void subscribe(ObservableEmitter<Faker> e) throws Exception { final Faker faker = new Faker(); if (!e.isDisposed()) { e.onNext(faker); e.onComplete(); } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subject); return subject; }
private void doSomeWork() { ReplaySubject<Integer> source = ReplaySubject.create(); source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 source.onNext(1); source.onNext(2); source.onNext(3); source.onNext(4); source.onComplete(); /* * it will emit 1, 2, 3, 4 for second observer also as we have used replay */ source.subscribe(getSecondObserver()); }
/** * ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。 * 也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。 * * 如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法), * 这可能导致同时(非顺序)调用,这会违反Observable协议, 给Subject的结果增加了不确定性。 */ private void doSomeWork() { ReplaySubject<Integer> source = ReplaySubject.create(); source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 source.onNext(1); source.onNext(2); source.onNext(3); source.onNext(4); source.onComplete(); /* * it will emit 1, 2, 3, 4 for second observer also as we have used replay */ source.subscribe(getSecondObserver()); }
@Test public void testUnsubscribeBeforeEmit() { TestObserver<String> observer = new TestObserver<>(); ReplaySubject<String> subject = ReplaySubject.create(); SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject); proxy.subscribe(observer); proxy.dispose(); observer.assertNotComplete(); observer.assertNoValues(); subject.onNext("Avanti!"); subject.onComplete(); // disposable observables may not be resused in RxJava2 observer = new TestObserver<>(); proxy.subscribe(observer); observer.assertComplete(); observer.assertValue("Avanti!"); }
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() { TestObserver<String> observer = new TestObserver<>(); ReplaySubject<String> subject = ReplaySubject.create(); SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject); proxy.subscribe(observer); proxy.dispose(); observer.assertNoValues(); subject.onNext("Avanti!"); subject.onComplete(); // disposable observables may not be resused in RxJava2 observer = new TestObserver<>(); proxy.subscribe(observer); observer.awaitTerminalEvent(3, TimeUnit.SECONDS); observer.assertValue("Avanti!"); }
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() { // Use case: When rotating an activity, ObservableManager will re-subscribe original request's // Observable to a new Observer, which is a member of the new activity instance. In this // case, we may want to redeliver any previous results (if the request is still being // managed by ObservableManager). TestObserver<String> observer = new TestObserver<>(); ReplaySubject<String> subject = ReplaySubject.create(); SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject); proxy.subscribe(observer); subject.onNext("Avanti!"); subject.onComplete(); proxy.dispose(); TestObserver<String> newSubscriber = new TestObserver<>(); proxy.subscribe(newSubscriber); newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS); newSubscriber.assertComplete(); newSubscriber.assertValue("Avanti!"); observer.assertComplete(); observer.assertValue("Avanti!"); }
public static void main(String[] args) { Subject<String> subject = ReplaySubject.create(); subject.subscribe(s -> System.out.println("Observer 1: " + s)); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); subject.subscribe(s -> System.out.println("Observer 2: " + s)); }
public static ReplaySubject<Faker> getInstance() { if (rxFaker == null) { rxFaker = createFaker(); } return rxFaker; }
@Test public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException { final Subject<byte[]> stdoutSubject = ReplaySubject.create(); final byte[] expectedStdoutBytes = generateRandomBytes(); stdoutSubject.onNext(expectedStdoutBytes); final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never()); final JobManager jobManager = createManagerWith(jobExecutor); final Semaphore s = new Semaphore(1); s.acquire(); final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() { @Override public void onSubscribe(@NonNull Disposable disposable) {} @Override public void onNext(@NonNull byte[] bytes) { assertThat(bytes).isEqualTo(expectedStdoutBytes); s.release(); } @Override public void onError(@NonNull Throwable throwable) { fail("Error from observable"); s.release(); } @Override public void onComplete() {} }); jobManager.submit(STANDARD_VALID_REQUEST, listeners); if (!s.tryAcquire(1, SECONDS)) { fail("Timed out before any bytes received"); } }
@Test public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException { final Subject<byte[]> stderr = ReplaySubject.create(); final byte[] stderrBytes = generateRandomBytes(); stderr.onNext(stderrBytes); final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr); final JobManager jobManager = createManagerWith(jobExecutor); final Semaphore s = new Semaphore(1); s.acquire(); final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() { @Override public void onSubscribe(@NonNull Disposable disposable) {} @Override public void onNext(@NonNull byte[] bytes) { assertThat(bytes).isEqualTo(stderrBytes); s.release(); } @Override public void onError(@NonNull Throwable throwable) { fail("Error from observable"); s.release(); } @Override public void onComplete() {} }); jobManager.submit(STANDARD_VALID_REQUEST, listeners); if (!s.tryAcquire(1, SECONDS)) { fail("Timed out before any bytes received"); } }
@Test public void shouldKeepDeliveringEventsAfterResubscribed() { TestObserver<String> observer = new TestObserver<>(); ReplaySubject<String> subject = ReplaySubject.create(); SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject); proxy.subscribe(observer); subject.onNext("Avanti 1"); proxy.dispose(); observer = new TestObserver<>(); proxy.subscribe(observer); subject.onNext("Avanti!"); observer.assertValues("Avanti 1", "Avanti!"); }
@Override public ObservableSource<T> apply(Observable<T> upstream) { final ReplaySubject<Notification<T>> subject = ReplaySubject.create(); final DisposableObserver<Notification<T>> observer = upstream.materialize() .subscribeWith(new DisposableObserver<Notification<T>>() { @Override public void onComplete() { subject.onComplete(); } @Override public void onError(Throwable e) { subject.onError(e); } @Override public void onNext(Notification<T> value) { subject.onNext(value); } }); return view .switchMap(new Function<Boolean, Observable<Notification<T>>>() { @Override public Observable<Notification<T>> apply(final Boolean flag) { if (flag) { return subject; } else { return Observable.empty(); } } }) .doOnDispose(new Action() { @Override public void run() throws Exception { observer.dispose(); } }) .dematerialize(); }
@Before public void setup() throws Exception { super.setup(); sessionPub = ReplaySubject.create(); sessionPub.onNext(rxProcessSession); when(rxProcess.open()).thenAnswer(invocation -> { when(rxProcessSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e)); return sessionPub.firstOrError(); }); cmdStream = new MockOutputStream(new MockOutputStream.Listener() { @Override public void onNewLine(String line) { if (line.equals("exit" + LineReader.getLineSeparator())) { try { cmdStream.close(); } catch (IOException e) { Timber.e(e); } finally { waitForEmitter.onSuccess(0); } } } @Override public void onClose() { } }); outputStream = new MockInputStream(); errorStream = new MockInputStream(); when(rxProcessSession.input()).thenReturn(cmdStream); when(rxProcessSession.output()).thenReturn(outputStream); when(rxProcessSession.error()).thenReturn(errorStream); when(rxProcessSession.isAlive()).thenReturn(Single.create(e -> e.onSuccess(cmdStream.isOpen()))); when(rxProcessSession.destroy()).then(invocation -> Completable.create(e -> { cmdStream.close(); waitForEmitter.onSuccess(1); e.onComplete(); })); }
public static void main(String[] args) { // TODO Auto-generated method stub Observer<Long> observer=new Observer<Long>() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("It's Done"); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub throwable.printStackTrace(); } @Override public void onNext(Long value) { // TODO Auto-generated method stub System.out.println(":"+value); } @Override public void onSubscribe(Disposable disposable) { // TODO Auto-generated method stub System.out.println("onSubscribe"); } }; ReplaySubject<Long> replaySubject=ReplaySubject.create(); replaySubject.onNext(1l); replaySubject.onNext(2l); replaySubject.subscribe(observer); replaySubject.onNext(10l); replaySubject.onComplete(); }
@Override protected final void initObservable() { subject = ReplaySubject.create(); subject.observeOn(EventThread.getScheduler(observeThread)) .subscribeOn(EventThread.getScheduler(subscribeThread)); }
private void captureOutput(Consumer<Observable<String>> observer, InputStream stream) { Subject<Object> subject = ReplaySubject.create().toSerialized(); readStreamAsync(subject, stream); observer.accept(subject.ofType(String.class) .observeOn(Schedulers.io())); }
public Subject<String> getSubject(){ mSubject = ReplaySubject.create(); return mSubject; }
public static RxJava2SubjProxy replaySubjectProxy() { return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.PASS); }
public static RxJava2SubjProxy serializedReplaySubjectProxy() { return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.PASS); }
public static RxJava2SubjProxy safeReplaySubjectProxy() { return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.WRAP); }
public static RxJava2SubjProxy safeSerializedReplaySubjectProxy() { return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.WRAP); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); Log.d(GitHubOAuth.TAG, "OAuthActivity: onCreate " + "savedInstanceState = " + savedInstanceState + ", getIntent() = " + getIntent()); setContentView(R.layout.progress_dialog); Icepick.restoreInstanceState(this, savedInstanceState); if (mGitHubOAuth == null) { mGitHubOAuth = getIntent().getParcelableExtra(ARG_KEY_AUTH); } // init reference if (sOAuthResultSubject == null || sOAuthResultSubject.get() == null) { mOAuthResultSubject = ReplaySubject.create(); sOAuthResultSubject = new WeakReference<>(mOAuthResultSubject); } else { mOAuthResultSubject = sOAuthResultSubject.get(); } if (isBrowserIntent(getIntent())) { Log.d(GitHubOAuth.TAG, "OAuthActivity: Got browser intent in new created instance."); Pair<OAuthResult, String> result = getOAuthResult(getIntent()); mOAuthResultSubject.onNext(result); finish(); return; } else if (mGitHubOAuth == null) { authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "Invalid launch intent"); return; } mOAuthPresenter = new OAuthPresenter(mGitHubOAuth); mOAuthPresenter.attach(this); Log.d(GitHubOAuth.TAG, "OAuthActivity: onCreate mState = " + mState); switch (mState) { case STATE_SEND_REQ: // recreated after send request, check `sOAuthResultSubject` mState = STATE_WAIT_CODE; mOAuthPresenter.waitCode(mOAuthResultSubject); break; case STATE_CALL_API: // recreated after got code, because code can only be used once, so we fail authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "Activity killed when call api"); break; case STATE_NOT_REQ: handleLaunchIntent(); break; default: // we may got killed at STATE_WAIT_CODE, it's too complicated to handle, just fail authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "un-handled state " + mState); break; } }
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch, final long start, final int maxConcurrency) { return Flowable.defer(new Callable<Flowable<T>>() { @Override public Flowable<T> call() throws Exception { // need a ReplaySubject because multiple requests can come // through before concatEager has established subscriptions to // the subject final ReplaySubject<Flowable<T>> subject = ReplaySubject.create(); final AtomicLong position = new AtomicLong(start); LongConsumer request = new LongConsumer() { @Override public void accept(final long n) throws Exception { final long pos = position.getAndAdd(n); if (SubscriptionHelper.validate(n)) { Flowable<T> flowable; try { flowable = fetch.apply(pos, n); } catch (Throwable e) { Exceptions.throwIfFatal(e); subject.onError(e); return; } // reduce allocations by incorporating the onNext // and onComplete actions into the mutable count // object final Count count = new Count(subject, n); flowable = flowable // .doOnNext(count) // .doOnComplete(count); subject.onNext(flowable); } } }; return Flowable // .concatEager(subject.serialize() // .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) // .doOnRequest(request); } }); }
@Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.compose(new DelayObservableTransformer<>(pauseLifecycle, ReplaySubject.<T>create())); }
@Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.compose(new DelayObservableTransformer<>(pauseLifecycle, ReplaySubject.<T>createWithSize(1))); }
private static void demo4() throws InterruptedException { Subject<String> subject = ReplaySubject.create(); Observable.interval(0, 1, TimeUnit.SECONDS) .map(Objects::toString) .subscribe(subject); Thread.sleep(3100); subject.subscribe(v -> log(v)); }