Java 类io.reactivex.subjects.ReplaySubject 实例源码
项目:showroom-android
文件:RxFaker.java
private static ReplaySubject<Faker> createFaker() {
final ReplaySubject<Faker> subject = ReplaySubject.create();
Observable.create(new ObservableOnSubscribe<Faker>() {
@Override
public void subscribe(ObservableEmitter<Faker> e) throws Exception {
final Faker faker = new Faker();
if (!e.isDisposed()) {
e.onNext(faker);
e.onComplete();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subject);
return subject;
}
项目:GitHub
文件:ReplaySubjectExampleActivity.java
private void doSomeWork() {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 1, 2, 3, 4 for second observer also as we have used replay
*/
source.subscribe(getSecondObserver());
}
项目:RxJava2-Android-Sample
文件:ReplaySubjectExampleActivity.java
/**
* ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
* 也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
*
* 如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),
* 这可能导致同时(非顺序)调用,这会违反Observable协议, 给Subject的结果增加了不确定性。
*/
private void doSomeWork() {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 1, 2, 3, 4 for second observer also as we have used replay
*/
source.subscribe(getSecondObserver());
}
项目:RxJava2-Android-Samples
文件:ReplaySubjectExampleActivity.java
private void doSomeWork() {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 1, 2, 3, 4 for second observer also as we have used replay
*/
source.subscribe(getSecondObserver());
}
项目:RxGroups
文件:SubscriptionProxyTest.java
@Test public void testUnsubscribeBeforeEmit() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
observer.assertNotComplete();
observer.assertNoValues();
subject.onNext("Avanti!");
subject.onComplete();
// disposable observables may not be resused in RxJava2
observer = new TestObserver<>();
proxy.subscribe(observer);
observer.assertComplete();
observer.assertValue("Avanti!");
}
项目:RxGroups
文件:SubscriptionProxyTest.java
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
observer.assertNoValues();
subject.onNext("Avanti!");
subject.onComplete();
// disposable observables may not be resused in RxJava2
observer = new TestObserver<>();
proxy.subscribe(observer);
observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
observer.assertValue("Avanti!");
}
项目:RxGroups
文件:SubscriptionProxyTest.java
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() {
// Use case: When rotating an activity, ObservableManager will re-subscribe original request's
// Observable to a new Observer, which is a member of the new activity instance. In this
// case, we may want to redeliver any previous results (if the request is still being
// managed by ObservableManager).
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
subject.onNext("Avanti!");
subject.onComplete();
proxy.dispose();
TestObserver<String> newSubscriber = new TestObserver<>();
proxy.subscribe(newSubscriber);
newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
newSubscriber.assertComplete();
newSubscriber.assertValue("Avanti!");
observer.assertComplete();
observer.assertValue("Avanti!");
}
项目:Learning-RxJava
文件:Ch5_24.java
public static void main(String[] args) {
Subject<String> subject =
ReplaySubject.create();
subject.subscribe(s -> System.out.println("Observer 1: " +
s));
subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();
subject.subscribe(s -> System.out.println("Observer 2: " +
s));
}
项目:showroom-android
文件:RxFaker.java
public static ReplaySubject<Faker> getInstance() {
if (rxFaker == null) {
rxFaker = createFaker();
}
return rxFaker;
}
项目:jobson
文件:JobManagerTest.java
@Test
public void testSubmitJobEventListenersEchoStdoutWhenExecutorEchoesStdout() throws InterruptedException {
final Subject<byte[]> stdoutSubject = ReplaySubject.create();
final byte[] expectedStdoutBytes = generateRandomBytes();
stdoutSubject.onNext(expectedStdoutBytes);
final JobExecutor jobExecutor = MockJobExecutor.thatUses(stdoutSubject, Observable.never());
final JobManager jobManager = createManagerWith(jobExecutor);
final Semaphore s = new Semaphore(1);
s.acquire();
final JobEventListeners listeners = JobEventListeners.createStdoutListener(new Observer<byte[]>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {}
@Override
public void onNext(@NonNull byte[] bytes) {
assertThat(bytes).isEqualTo(expectedStdoutBytes);
s.release();
}
@Override
public void onError(@NonNull Throwable throwable) {
fail("Error from observable");
s.release();
}
@Override
public void onComplete() {}
});
jobManager.submit(STANDARD_VALID_REQUEST, listeners);
if (!s.tryAcquire(1, SECONDS)) {
fail("Timed out before any bytes received");
}
}
项目:jobson
文件:JobManagerTest.java
@Test
public void testSubmitJobEventListenersEchoStderrWhenExecutorEchoesStderr() throws InterruptedException {
final Subject<byte[]> stderr = ReplaySubject.create();
final byte[] stderrBytes = generateRandomBytes();
stderr.onNext(stderrBytes);
final JobExecutor jobExecutor = MockJobExecutor.thatUses(Observable.never(), stderr);
final JobManager jobManager = createManagerWith(jobExecutor);
final Semaphore s = new Semaphore(1);
s.acquire();
final JobEventListeners listeners = JobEventListeners.createStderrListener(new Observer<byte[]>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {}
@Override
public void onNext(@NonNull byte[] bytes) {
assertThat(bytes).isEqualTo(stderrBytes);
s.release();
}
@Override
public void onError(@NonNull Throwable throwable) {
fail("Error from observable");
s.release();
}
@Override
public void onComplete() {}
});
jobManager.submit(STANDARD_VALID_REQUEST, listeners);
if (!s.tryAcquire(1, SECONDS)) {
fail("Timed out before any bytes received");
}
}
项目:RxGroups
文件:SubscriptionProxyTest.java
@Test public void shouldKeepDeliveringEventsAfterResubscribed() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
subject.onNext("Avanti 1");
proxy.dispose();
observer = new TestObserver<>();
proxy.subscribe(observer);
subject.onNext("Avanti!");
observer.assertValues("Avanti 1", "Avanti!");
}
项目:arctor
文件:WaitViewReplayTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
final ReplaySubject<Notification<T>> subject = ReplaySubject.create();
final DisposableObserver<Notification<T>> observer = upstream.materialize()
.subscribeWith(new DisposableObserver<Notification<T>>() {
@Override
public void onComplete() {
subject.onComplete();
}
@Override
public void onError(Throwable e) {
subject.onError(e);
}
@Override
public void onNext(Notification<T> value) {
subject.onNext(value);
}
});
return view
.switchMap(new Function<Boolean, Observable<Notification<T>>>() {
@Override
public Observable<Notification<T>> apply(final Boolean flag) {
if (flag) {
return subject;
} else {
return Observable.empty();
}
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
observer.dispose();
}
})
.dematerialize();
}
项目:RxShell
文件:RxShellTest.java
@Before
public void setup() throws Exception {
super.setup();
sessionPub = ReplaySubject.create();
sessionPub.onNext(rxProcessSession);
when(rxProcess.open()).thenAnswer(invocation -> {
when(rxProcessSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e));
return sessionPub.firstOrError();
});
cmdStream = new MockOutputStream(new MockOutputStream.Listener() {
@Override
public void onNewLine(String line) {
if (line.equals("exit" + LineReader.getLineSeparator())) {
try {
cmdStream.close();
} catch (IOException e) {
Timber.e(e);
} finally {
waitForEmitter.onSuccess(0);
}
}
}
@Override
public void onClose() {
}
});
outputStream = new MockInputStream();
errorStream = new MockInputStream();
when(rxProcessSession.input()).thenReturn(cmdStream);
when(rxProcessSession.output()).thenReturn(outputStream);
when(rxProcessSession.error()).thenReturn(errorStream);
when(rxProcessSession.isAlive()).thenReturn(Single.create(e -> e.onSuccess(cmdStream.isOpen())));
when(rxProcessSession.destroy()).then(invocation -> Completable.create(e -> {
cmdStream.close();
waitForEmitter.onSuccess(1);
e.onComplete();
}));
}
项目:Reactive-Programming-With-Java-9
文件:Demo_ReplaySubject.java
public static void main(String[] args) {
// TODO Auto-generated method stub
Observer<Long> observer=new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("It's Done");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(":"+value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println("onSubscribe");
}
};
ReplaySubject<Long> replaySubject=ReplaySubject.create();
replaySubject.onNext(1l);
replaySubject.onNext(2l);
replaySubject.subscribe(observer);
replaySubject.onNext(10l);
replaySubject.onComplete();
}
项目:RxBusLib
文件:SubscriberReplayEvent.java
@Override
protected final void initObservable() {
subject = ReplaySubject.create();
subject.observeOn(EventThread.getScheduler(observeThread))
.subscribeOn(EventThread.getScheduler(subscribeThread));
}
项目:clustercode
文件:ExternalProcessServiceImpl.java
private void captureOutput(Consumer<Observable<String>> observer, InputStream stream) {
Subject<Object> subject = ReplaySubject.create().toSerialized();
readStreamAsync(subject, stream);
observer.accept(subject.ofType(String.class)
.observeOn(Schedulers.io()));
}
项目:webtrekk-android-sdk
文件:HttpServer.java
public Subject<String> getSubject(){
mSubject = ReplaySubject.create();
return mSubject;
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2SubjProxy replaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.PASS);
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2SubjProxy serializedReplaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.PASS);
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2SubjProxy safeReplaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create(), Roxy.TePolicy.WRAP);
}
项目:RHub
文件:RxJava2Proxies.java
public static RxJava2SubjProxy safeSerializedReplaySubjectProxy() {
return new RxJava2SubjProxy(ReplaySubject.create().toSerialized(), Roxy.TePolicy.WRAP);
}
项目:GitHubAndroidOAuth
文件:OAuthActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Log.d(GitHubOAuth.TAG, "OAuthActivity: onCreate "
+ "savedInstanceState = " + savedInstanceState
+ ", getIntent() = " + getIntent());
setContentView(R.layout.progress_dialog);
Icepick.restoreInstanceState(this, savedInstanceState);
if (mGitHubOAuth == null) {
mGitHubOAuth = getIntent().getParcelableExtra(ARG_KEY_AUTH);
}
// init reference
if (sOAuthResultSubject == null || sOAuthResultSubject.get() == null) {
mOAuthResultSubject = ReplaySubject.create();
sOAuthResultSubject = new WeakReference<>(mOAuthResultSubject);
} else {
mOAuthResultSubject = sOAuthResultSubject.get();
}
if (isBrowserIntent(getIntent())) {
Log.d(GitHubOAuth.TAG, "OAuthActivity: Got browser intent in new created instance.");
Pair<OAuthResult, String> result = getOAuthResult(getIntent());
mOAuthResultSubject.onNext(result);
finish();
return;
} else if (mGitHubOAuth == null) {
authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "Invalid launch intent");
return;
}
mOAuthPresenter = new OAuthPresenter(mGitHubOAuth);
mOAuthPresenter.attach(this);
Log.d(GitHubOAuth.TAG, "OAuthActivity: onCreate mState = " + mState);
switch (mState) {
case STATE_SEND_REQ:
// recreated after send request, check `sOAuthResultSubject`
mState = STATE_WAIT_CODE;
mOAuthPresenter.waitCode(mOAuthResultSubject);
break;
case STATE_CALL_API:
// recreated after got code, because code can only be used once, so we fail
authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "Activity killed when call api");
break;
case STATE_NOT_REQ:
handleLaunchIntent();
break;
default:
// we may got killed at STATE_WAIT_CODE, it's too complicated to handle, just fail
authFail(GitHubOAuth.ERROR_UNKNOWN_ERROR, "un-handled state " + mState);
break;
}
}
项目:rxjava2-extras
文件:FlowableFetchPagesByRequest.java
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
final long start, final int maxConcurrency) {
return Flowable.defer(new Callable<Flowable<T>>() {
@Override
public Flowable<T> call() throws Exception {
// need a ReplaySubject because multiple requests can come
// through before concatEager has established subscriptions to
// the subject
final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
final AtomicLong position = new AtomicLong(start);
LongConsumer request = new LongConsumer() {
@Override
public void accept(final long n) throws Exception {
final long pos = position.getAndAdd(n);
if (SubscriptionHelper.validate(n)) {
Flowable<T> flowable;
try {
flowable = fetch.apply(pos, n);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
subject.onError(e);
return;
}
// reduce allocations by incorporating the onNext
// and onComplete actions into the mutable count
// object
final Count count = new Count(subject, n);
flowable = flowable //
.doOnNext(count) //
.doOnComplete(count);
subject.onNext(flowable);
}
}
};
return Flowable //
.concatEager(subject.serialize() //
.toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
.doOnRequest(request);
}
});
}
项目:RxDelay
文件:DelayReplayObservableTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.compose(new DelayObservableTransformer<>(pauseLifecycle, ReplaySubject.<T>create()));
}
项目:RxDelay
文件:DelayLatestObservableTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.compose(new DelayObservableTransformer<>(pauseLifecycle, ReplaySubject.<T>createWithSize(1)));
}
项目:Reactive-Android-Programming
文件:Sandbox.java
private static void demo4() throws InterruptedException {
Subject<String> subject = ReplaySubject.create();
Observable.interval(0, 1, TimeUnit.SECONDS)
.map(Objects::toString)
.subscribe(subject);
Thread.sleep(3100);
subject.subscribe(v -> log(v));
}