public static void main(String[] args) { Subject<String> subject = AsyncSubject.create(); subject.subscribe(s -> System.out.println("Observer 1: " + s), Throwable::printStackTrace, () -> System.out.println("Observer 1 done!") ); subject.onNext("Alpha"); subject.onNext("Beta"); subject.onNext("Gamma"); subject.onComplete(); subject.subscribe(s -> System.out.println("Observer 2: " + s), Throwable::printStackTrace, () -> System.out.println("Observer 2 done!") ); }
private void doSomeWork(int type) { AsyncSubject<Integer> source = AsyncSubject.create(); source.subscribe(getFirstObserver()); // it will emit only 4 and onComplete source.onNext(1); source.onNext(2); source.onNext(3); /* * it will emit 4 and onComplete for second observer also. */ source.subscribe(getSecondObserver()); source.onNext(4); if (type==COMPLETE){ source.onComplete(); } if (type==ERROR){ source.onError(new Exception("test send onError")); } }
/**main function request for permission. If Completable completes permission is granted. * Otherwise error is emitted. */ public List<Completable> requestPermission(@NotNull Activity activity, @NotNull String permissions[]) { List<Completable> completables = new ArrayList<>(); List<String> permissionsResult = new ArrayList<>(); for (String permission: permissions) { AsyncSubject<Void> observable = AsyncSubject.create(); if (ContextCompat.checkSelfPermission(activity, permission) != PackageManager.PERMISSION_GRANTED) { permissionsResult.add(permission); observables.put(permission, observable); completables.add(Completable.fromObservable(observable)); } else { completables.add(Completable.complete()); } } ActivityCompat.requestPermissions(activity, permissionsResult.toArray(new String[permissionsResult.size()]), PERMISSION_REQUEST); return completables; }
public void processResponse(int requestCode, @NotNull String permissions[], @NotNull int[] grantResults) { switch (requestCode) { case PERMISSION_REQUEST: { for (int i = 0; i< grantResults.length; i++){ final AsyncSubject<Void> observable = observables.get(permissions[i]); if (observable == null){ continue; } if (grantResults[i] == PackageManager.PERMISSION_GRANTED){ observable.onComplete(); } else { observable.onError(new SecurityException()); } } } } }
@Test public void testAsyncSubjectSubscribeAfterComplete() { // create an async subject, subscribe pre, and pump test through AsyncSubject<String> testSubject = AsyncSubject.create(); RxAsserter<String> preObserver = RxAsserter.on(testSubject); testSubject.onNext("test"); // make sure that no one has observed anything yet preObserver.assertValues(); // when the subject completes, pre should observe but not post testSubject.onComplete(); preObserver.assertValues("test"); // and if we subscribe after the fact, everyone should get it RxAsserter<String> postObserver = RxAsserter.on(testSubject); preObserver.assertValues("test"); postObserver.assertValues("test"); }
public static void main(String[] args) { // TODO Auto-generated method stub AsyncSubject<Long> asyncSubject=AsyncSubject.create(); asyncSubject.subscribe(new Observer<Long>() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("It's Done"); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub throwable.printStackTrace(); } @Override public void onNext(Long value) { // TODO Auto-generated method stub System.out.println(":"+value); } @Override public void onSubscribe(Disposable disposable) { // TODO Auto-generated method stub System.out.println("onSubscribe"); } }); asyncSubject.onNext(1L); asyncSubject.onNext(2L); asyncSubject.onNext(10L); asyncSubject.onComplete(); }
/** * Create a Observable that signals the terminal value or error of the given * CompletionStage. * <p>Cancelling the Observable subscription doesn't cancel the CompletionStage. * @param <T> the value type * @param cs the CompletionStage instance * @return the new Observable instance */ public static <T> Observable<T> fromFuture(CompletionStage<T> cs) { AsyncSubject<T> ap = AsyncSubject.create(); cs.whenComplete((v, e) -> { if (e != null) { ap.onError(e); } else { ap.onNext(v); ap.onComplete(); } }); return ap; }
private void doSomeWork() { AsyncSubject<Integer> source = AsyncSubject.create(); source.subscribe(getFirstObserver()); // it will emit only 4 and onComplete source.onNext(1); source.onNext(2); source.onNext(3); /* * it will emit 4 and onComplete for second observer also. */ source.subscribe(getSecondObserver()); source.onNext(4); source.onComplete(); }
private static void demo5() throws InterruptedException { Subject<String> subject = AsyncSubject.create(); Observable.interval(0, 1, TimeUnit.SECONDS) .take(4) .map(Objects::toString) .subscribe(subject); subject.subscribe(v -> log(v)); Thread.sleep(5100); subject.subscribe(v -> log(v)); }