private void doSomeWork() { PublishSubject<Integer> source = PublishSubject.create(); ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay connectableObservable.connect(); // connecting the connectableObservable connectableObservable.subscribe(getFirstObserver()); source.onNext(1); source.onNext(2); source.onNext(3); source.onNext(4); source.onComplete(); /* * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay */ connectableObservable.subscribe(getSecondObserver()); }
private void doSomeWork() { Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6); //使用publish操作符将普通Observable转换为可连接的Observable ConnectableObservable<Long> connectableObservable = observable // .publish(); .replay(5); //第一个订阅者订阅,不会开始发射数据 connectableObservable .compose(Utils.<Long>ioToMain()) .subscribe(getFirstObserver()); //如果不调用connect方法,connectableObservable则不会发射数据 //即使没有任何订阅者订阅它,你也可以使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。 //这样,你可以将一个”冷”的Observable变为”热”的。 connectableObservable.connect(); //第二个订阅者延迟2s订阅,这将导致丢失前面2s内发射的数据 connectableObservable.delaySubscription(2, TimeUnit.SECONDS)//0,1数据丢失 .compose(Utils.<Long>ioToMain()) .subscribe(getSecondObserver()); }
private void postTextOnlyTweet(String status) { mProgressDialog.show(); ConnectableObservable<StatusUpdate> observable = mTwitterApi.postTweet(status, null, mLatitude, mLongitude) .subscribeOn(Schedulers.io()) .publish(); Disposable postingDisposable = observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::onSuccessfulTweetPosting, this::onErrorTweetPosting); mCompositeDisposable.add(postingDisposable); Disposable crossPostingDisposable = observable .flatMap(this::pushTweetToLoklak) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( push -> Log.e(LOG_TAG, push.getStatus()), t -> Log.e(LOG_TAG, "Cross posting failed: " + t.toString()) ); mCompositeDisposable.add(crossPostingDisposable); Disposable publishDisposable = observable.connect(); mCompositeDisposable.add(publishDisposable); }
public static void main(String[] args) { ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3) .map(i -> randomInt()).publish(); //Observer 1 - print each random integer threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i)); //Observer 2 - sum the random integers, then print threeRandoms.reduce(0, (total, next) -> total + next) .subscribe(i -> System.out.println("Observer 2: " + i)); threeRandoms.connect(); }
public static void main(String[] args) { ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3) .map(i -> randomInt()).publish(); threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i)); threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i)); threeRandoms.connect(); }
public static void main(String[] args) { ConnectableObservable<Integer> threeIntegers = Observable.range(1, 3).publish(); threeIntegers.subscribe(i -> System.out.println("Observer One:" + i)); threeIntegers.subscribe(i -> System.out.println("Observer Two:" + i)); threeIntegers.connect(); }
public static void main(String[] args) { ConnectableObservable<Integer> threeInts = Observable.range(1, 3).publish(); Observable<Integer> threeRandoms = threeInts.map(i -> randomInt()); threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i)); threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i)); threeInts.connect(); }
public static void main(String[] args) { ConnectableObservable<Long> seconds = Observable.interval(1, TimeUnit.SECONDS).publish(); //observer 1 seconds.subscribe(l -> System.out.println("Observer 1: " + l)); seconds.connect(); //sleep 5 seconds sleep(5000); //observer 2 seconds.subscribe(l -> System.out.println("Observer 2: " + l)); //sleep 5 seconds sleep(5000); }
public static void main(String[] args) { ConnectableObservable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon") .publish(); //Set up observer 1 source.subscribe(s -> System.out.println("Observer 1: " + s)); //Set up observer 2 source.map(String::length) .subscribe(i -> System.out.println("Observer 2: " + i)); //Fire! source.connect(); }
@Override public ConnectableObservable<String> freeFlowEmps() { List<String> rosterNames = new ArrayList<>(); Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase(); ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish(); flowyNames.subscribe(System.out::println); flowyNames.subscribe((name) ->{ rosterNames.add(name); }); System.out.println(rosterNames); return flowyNames; }
/** * If the receiver is enabled, this method will: * <p> * 1. Invoke the `func` given at the time of creation. * 2. Multicast the returned observable. * 3. Send the multicasted observable on {@link #executionObservables()}. * 4. Subscribe (connect) to the original observable on the main thread. * * @param input The input value to pass to the receiver's `func`. This may be null. * @return the multicasted observable, after subscription. If the receiver is not * enabled, returns a observable that will send an error. */ @MainThread public final Observable<T> execute(@Nullable Object input) { boolean enabled = mImmediateEnabled.blockingFirst(); if (!enabled) { return Observable.error(new IllegalStateException("The command is disabled and cannot be executed")); } try { Observable<T> observable = mFunc.apply(input); if (observable == null) { throw new RuntimeException(String.format("null Observable returned from observable func for value %s", input)); } // This means that `executing` and `enabled` will send updated values before // the observable actually starts performing work. final ConnectableObservable<T> connection = observable .subscribeOn(AndroidSchedulers.mainThread()) .replay(); mAddedExecutionObservableSubject.onNext(connection); connection.connect(); return connection; } catch (Exception e) { e.printStackTrace(); return Observable.error(e); } }
private void testRefCount() { Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = observable.publish(); connectableObservable.connect(); Observable<Long> longObservable = connectableObservable.refCount(); longObservable.delaySubscription(2, TimeUnit.SECONDS) .compose(Utils.<Long>ioToMain()) .subscribe(getFirstObserver()); longObservable.compose(Utils.<Long>ioToMain()).subscribe(getSecondObserver()); }
private void checkSyncStartAndFinishEvents(ConnectableObservable<BaseEvent> syncEventBus) { syncEventBus .test() .awaitCount(2, () -> { }, MAX_TIMEOUT_MILLIS) .assertNoErrors() .assertValueAt(0, event -> { assertThat(event).isInstanceOf(SyncStartEvent.class); return true; }) .assertValueAt(1, event -> { assertThat(event).isInstanceOf(SyncFinishEvent.class); return true; }); }
private void postImageAndTextTweet(List<Observable<String>> imageIdObservables, String status) { mProgressDialog.show(); ConnectableObservable<StatusUpdate> observable = Observable.zip( imageIdObservables, mediaIdArray -> { String mediaIds = ""; for (Object mediaId : mediaIdArray) { mediaIds = mediaIds + String.valueOf(mediaId) + ","; } return mediaIds.substring(0, mediaIds.length() - 1); }) .flatMap(imageIds -> mTwitterApi.postTweet(status, imageIds, mLatitude, mLongitude)) .subscribeOn(Schedulers.io()) .publish(); Disposable postingDisposable = observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::onSuccessfulTweetPosting, this::onErrorTweetPosting); mCompositeDisposable.add(postingDisposable); Disposable crossPostingDisposable = observable .flatMap(this::pushTweetToLoklak) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( push -> {}, t -> Log.e(LOG_TAG, "Cross posting failed: " + t.toString()) ); mCompositeDisposable.add(crossPostingDisposable); Disposable publishDisposable = observable.connect(); mCompositeDisposable.add(publishDisposable); }
private void displayAndPostScrapedData() { progressBar.setVisibility(View.VISIBLE); ConnectableObservable<ScrapedData> observable = Observable.interval(4, TimeUnit.SECONDS) .flatMap(this::getSuggestionsPeriodically) .flatMap(query -> { mSuggestionQuerries.add(query); return getScrapedTweets(query); }) .retry(2) .publish(); Disposable viewDisposable = observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( this::displayScrapedData, this::setNetworkErrorView ); mCompositeDisposable.add(viewDisposable); Disposable pushDisposable = observable .flatMap(this::pushScrapedData) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( push -> { mHarvestedTweets += push.getRecords(); harvestedTweetsCountTextView.setText(String.valueOf(mHarvestedTweets)); }, throwable -> Log.e(LOG_TAG, throwable.toString()) ); mCompositeDisposable.add(pushDisposable); Disposable publishDisposable = observable.connect(); mCompositeDisposable.add(publishDisposable); }
@Override public void subscribeToDataStoreInternal(@NonNull final CompositeDisposable compositeDisposable) { checkNotNull(compositeDisposable); Log.v(TAG, "subscribeToDataStoreInternal"); ConnectableObservable<DataStreamNotification<GitHubRepositorySearch>> repositorySearchSource = searchString .debounce(SEARCH_INPUT_DELAY, TimeUnit.MILLISECONDS) .distinctUntilChanged() .filter(value -> value.length() > 2) .doOnNext(value -> Log.d(TAG, "Searching with: " + value)) .switchMap(getGitHubRepositorySearch::call) .publish(); compositeDisposable.add(repositorySearchSource .map(toProgressStatus()) .doOnNext(progressStatus -> Log.d(TAG, "Progress status: " + progressStatus.name())) .subscribe(this::setNetworkStatusText)); compositeDisposable.add(repositorySearchSource .filter(DataStreamNotification::isOnNext) .map(DataStreamNotification::getValue) .map(GitHubRepositorySearch::getItems) .flatMap(toGitHubRepositoryList()) .doOnNext(list -> Log.d(TAG, "Publishing " + list.size() + " repositories from the ViewModel")) .subscribe(repositories::onNext)); compositeDisposable.add(repositorySearchSource.connect()); }
ObservableOnAssemblyConnectable(ConnectableObservable<T> source) { this.source = source; this.assembled = new RxJavaAssemblyException(); }
@Test public void testHotObservable() throws Exception { ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS, Schedulers.from(executorService)).publish(); connectableObservable.subscribe(x -> System.out.println("L1:" + x)); Thread.sleep(2000); Disposable disposable = connectableObservable.connect(); //Let us begin connectableObservable.subscribe(x -> System.out.println("L2:" + x)); Thread.sleep(2000); connectableObservable.subscribe(x -> System.out.println("L3:" + x)); disposable.dispose(); Thread.sleep(10000); }
@NonNull Disposable bind(MainView view) { final CompositeDisposable disposable = new CompositeDisposable(); Observable<String> login = view.login().share(); Observable<String> password = view.password().share(); // Boolean is valid/invalid flag. ConnectableObservable<Triplet<String, String, Boolean>> credentials = Observable .combineLatest(login, password, (l, p) -> Triplet.with(l, p, !l.isEmpty() && !p.isEmpty())) .publish(); Observable<Object> signInEnable = credentials .filter(creds -> creds.getValue2()) .map(enable -> new Object()); Observable<Object> signInDisable = credentials .filter(creds -> !creds.getValue2()) .map(disable -> new Object()); // You can use static import for RxUi.bind() disposable.add(RxUi.bind(signInEnable, view.singInEnable())); disposable.add(RxUi.bind(signInDisable, view.singInDisable())); Observable<Object> signInResult = view .signInClicks() .withLatestFrom(credentials, (click, creds) -> creds.removeFrom2()) // Leave only login and password. .switchMap(loginAndPassword -> authService .signIn(loginAndPassword.getValue0(), loginAndPassword.getValue1()) .subscribeOn(ioScheduler)) // "API request". .share(); disposable.add(credentials.connect()); Observable<Success> signInSuccess = signInResult .filter(it -> it instanceof Success) .cast(Success.class); Observable<Failure> signInFailure = signInResult .filter(it -> it instanceof Failure) .cast(Failure.class); // You can use static import for RxUi.bind() disposable.add(RxUi.bind(signInSuccess, view.signInSuccess())); disposable.add(RxUi.bind(signInFailure, view.signInFailure())); return disposable; }
ConnectableObservableValidator(ConnectableObservable<T> source, PlainConsumer<ProtocolNonConformanceException> onViolation) { this.source = source; this.onViolation = onViolation; }
@Test public void connectableObservable() { ConnectableObservable<Integer> source = new ConnectableObservable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> s) { s.onComplete(); s.onError(null); s.onError(new IOException()); s.onNext(null); s.onNext(1); s.onSubscribe(null); s.onSubscribe(Disposables.empty()); s.onSubscribe(Disposables.empty()); s.onComplete(); s.onNext(2); } @Override public void connect(Consumer<? super Disposable> connection) { } }; RxJavaProtocolValidator.setOnViolationHandler(this); Assert.assertSame(this, RxJavaProtocolValidator.getOnViolationHandler()); SavedHooks h = RxJavaProtocolValidator.enableAndChain(); Assert.assertTrue(RxJavaProtocolValidator.isEnabled()); try { Observable.just(1).test().assertResult(1); Observable.empty().test().assertResult(); Observable.error(new IOException()).test().assertFailure(IOException.class); TestHelper.checkDisposed(RxJavaPlugins.onAssembly(PublishSubject.create())); ConnectableObservable<Integer> c = RxJavaPlugins.onAssembly(source); c.test(); c.connect(); Assert.assertEquals(15, errors.size()); TestHelper.assertError(errors, 0, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 1, NullOnErrorParameterException.class); TestHelper.assertError(errors, 2, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 3, MultipleTerminationsException.class); TestHelper.assertError(errors, 4, OnSubscribeNotCalledException.class); Assert.assertTrue("" + errors.get(4).getCause(), errors.get(4).getCause() instanceof IOException); TestHelper.assertError(errors, 5, MultipleTerminationsException.class); TestHelper.assertError(errors, 6, NullOnNextParameterException.class); TestHelper.assertError(errors, 7, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 8, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 9, OnSubscribeNotCalledException.class); TestHelper.assertError(errors, 10, OnNextAfterTerminationException.class); TestHelper.assertError(errors, 11, NullOnSubscribeParameterException.class); TestHelper.assertError(errors, 12, MultipleOnSubscribeCallsException.class); TestHelper.assertError(errors, 13, MultipleTerminationsException.class); TestHelper.assertError(errors, 14, OnNextAfterTerminationException.class); } finally { h.restore(); RxJavaProtocolValidator.setOnViolationHandler(null); } }
private SubscriptionProxy(Observable<T> sourceObservable, Action onTerminate) { final ConnectableObservable<T> replay = sourceObservable.replay(); sourceDisposable = replay.connect(); proxy = replay.doOnTerminate(onTerminate); disposableList = new CompositeDisposable(sourceDisposable); }
private ConnectableObservable<PositionedPlaybackFile> startPlayback(PreparedPlayableFileQueue preparedPlaybackQueue, final long filePosition) throws IOException { if (playbackSubscription != null) playbackSubscription.dispose(); activePlayer = playbackBootstrapper.startPlayback(preparedPlaybackQueue, filePosition); isPlaying = true; final ConnectableObservable<PositionedPlaybackFile> observable = activePlayer.observe(); playbackSubscription = observable.subscribe( p -> { isPlaying = true; positionedPlaybackFile = p; if (onPlayingFileChanged != null) onPlayingFileChanged.onPlayingFileChanged(p); saveStateToLibrary(p); }, e -> { if (e instanceof PreparationException) { final PreparationException preparationException = (PreparationException)e; saveStateToLibrary( new PositionedPlaybackFile( new EmptyPlaybackHandler(0), preparationException.getPositionedFile())); } if (onPlaylistError != null) onPlaylistError.runWith(e); }, () -> { isPlaying = false; positionedPlaybackFile = null; activePlayer = null; changePosition(0, 0) .then(positionedFile -> { if (onPlaylistReset != null) onPlaylistReset.onPlaylistReset(positionedFile); if (onPlaybackCompleted != null) onPlaybackCompleted.onPlaybackCompleted(); return null; }); }); observable.firstElement() .subscribe( p -> { if (onPlaybackStarted != null) onPlaybackStarted.onPlaybackStarted(p); }, e -> {}); return observable; }
@Override public ConnectableObservable<PositionedPlaybackFile> observe() { return observableProxy; }
@CheckReturnValue @SchedulerSupport("none") public ConnectableObservable<T> publish() { return boxed.publish(); }
@CheckReturnValue @SchedulerSupport("none") public ConnectableObservable<T> replay() { return boxed.replay(); }
@CheckReturnValue @SchedulerSupport("none") public ConnectableObservable<T> replay(int bufferSize) { return boxed.replay(bufferSize); }
@CheckReturnValue @SchedulerSupport("io.reactivex:computation") public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) { return boxed.replay(bufferSize, time, unit); }
@CheckReturnValue @SchedulerSupport("custom") public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { return boxed.replay(bufferSize, time, unit, scheduler); }
@CheckReturnValue @SchedulerSupport("custom") public ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) { return boxed.replay(bufferSize, scheduler); }
@CheckReturnValue @SchedulerSupport("io.reactivex:computation") public ConnectableObservable<T> replay(long time, TimeUnit unit) { return boxed.replay(time, unit); }
@CheckReturnValue @SchedulerSupport("custom") public ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) { return boxed.replay(time, unit, scheduler); }
@CheckReturnValue @SchedulerSupport("custom") public ConnectableObservable<T> replay(Scheduler scheduler) { return boxed.replay(scheduler); }