@Override protected void subscribeActual(MaybeObserver<? super T> s) { MaybeObserver<? super T> observer; try { observer = ObjectHelper.requireNonNull(s, "Null Observer"); } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Disposable already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } source.subscribe(new AttachMaybeObserver<>(observer, this.compositeDisposable)); }
@Override protected void subscribeActual(MaybeObserver<? super T> observer) { boolean b; try { b = condition.getAsBoolean(); } catch (Throwable ex) { EmptyDisposable.error(ex, observer); return; } if (b) { then.subscribe(observer); } else { orElse.subscribe(observer); } }
public static void updateNewsAndWidget(@NonNull final Context context, @NonNull ChannelManager channelManager, @NonNull final NewsManager newsManager) { channelManager.getStartUpLoadVariant() .applyStartupStrategy(channelManager.getFirstChannels()) .flatMap(channels -> newsManager.loadNewsFromRemote(channels)) .firstElement() .subscribe(new MaybeObserver<List<News>>() { @Override public void onComplete() { } @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(List<News> value) { updateNewsWidget(context); } @Override public void onError(Throwable e) { } }); }
@Test public void shouldHonorDisposedWhenCallingOnSuccess() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = CircuitBreakerOperator.of(circuitBreaker).apply(childObserver); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); decoratedObserver.onSuccess(1); // Then verify(childObserver, never()).onSuccess(any()); assertSingleSuccessfulCall(); }
@Test public void shouldHonorDisposedWhenCallingOnError() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = CircuitBreakerOperator.of(circuitBreaker).apply(childObserver); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); decoratedObserver.onError(new IllegalStateException()); // Then verify(childObserver, never()).onError(any()); assertSingleFailedCall(); }
@Test public void shouldHonorDisposedWhenCallingOnComplete() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = CircuitBreakerOperator.of(circuitBreaker).apply(childObserver); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); decoratedObserver.onComplete(); // Then verify(childObserver, never()).onComplete(); assertSingleSuccessfulCall(); }
@Test public void shouldHonorDisposedWhenCallingOnSuccess() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); decoratedObserver.onSuccess(1); // Then verify(childObserver, never()).onSuccess(any()); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); }
@Test public void shouldHonorDisposedWhenCallingOnError() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); decoratedObserver.onError(new IllegalStateException()); // Then verify(childObserver, never()).onError(any()); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); }
@Test public void shouldHonorDisposedWhenCallingOnComplete() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); decoratedObserver.onComplete(); // Then verify(childObserver, never()).onComplete(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); }
@Test public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver); bulkhead.isCallPermitted(); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); // Then assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0); }
private MaybeObserver<Integer> getObserver() { return new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onSuccess(Integer value) { textView.append(" onSuccess : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onSuccess : value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
public static void main(String[] args) { // TODO Auto-generated method stub String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya", "pineapple", "apple", "apple" }; Observable.fromArray(fruits).elementAt(3).count() .subscribe(item -> System.out.println("we got: " + item + " items from the Observable")); Observable.fromArray(fruits).elementAt(10).subscribe(new MaybeObserver<String>() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("successfully completed"); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub System.out.println(throwable.getMessage()); } @Override public void onSubscribe(Disposable disposable) { // TODO Auto-generated method stub } @Override public void onSuccess(String value) { // TODO Auto-generated method stub System.out.println("value at specified position is:-"+value); } }); }
@Override protected void subscribeActual(MaybeObserver<? super T> observer) { Task<T> task = run(); MaybeTaskCallback<T> callback = new MaybeTaskCallback(task, observer); observer.onSubscribe(callback); task.addOnCompleteListener(callback); }
private MaybeObserver<Integer> getObserver() { return new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " onSubscribe : " + d.isDisposed()); } @Override public void onSuccess(Integer value) { textView.append(" onNext : "); textView.append(AppConstant.LINE_SEPARATOR); textView.append(" value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onNext "); Log.d(TAG, " value : " + value); } @Override public void onError(Throwable e) { textView.append(" onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " onComplete"); } }; }
@Override protected void subscribeActual(MaybeObserver<? super Container> observer) { observer.onSubscribe(EmptyDisposable.INSTANCE); try { Container v = run(); observer.onSuccess(v); } catch (Throwable e) { Exceptions.throwIfFatal(e); observer.onError(e); } }
@Test public void verifyObserverDelegate() { final AtomicReference<MaybeObserver> atomicObserver = new AtomicReference<>(); final AtomicReference<MaybeObserver> atomicAutoDisposingObserver = new AtomicReference<>(); try { RxJavaPlugins.setOnMaybeSubscribe(new BiFunction<Maybe, MaybeObserver, MaybeObserver>() { @Override public MaybeObserver apply(Maybe source, MaybeObserver observer) { if (atomicObserver.get() == null) { atomicObserver.set(observer); } else if (atomicAutoDisposingObserver.get() == null) { atomicAutoDisposingObserver.set(observer); RxJavaPlugins.setOnObservableSubscribe(null); } return observer; } }); Maybe.just(1) .as(AutoDispose.<Integer>autoDisposable(ScopeProvider.UNBOUND)) .subscribe(); assertThat(atomicAutoDisposingObserver.get()).isNotNull(); assertThat(atomicAutoDisposingObserver.get()).isInstanceOf(AutoDisposingMaybeObserver.class); assertThat( ((AutoDisposingMaybeObserver) atomicAutoDisposingObserver.get()).delegateObserver()) .isNotNull(); assertThat( ((AutoDisposingMaybeObserver) atomicAutoDisposingObserver.get()).delegateObserver()) .isSameAs(atomicObserver.get()); } finally { RxJavaPlugins.reset(); } }
private void requestAdditionalPermission(List<PermissionHelper> permissions) { ReactiveLogin.requestAdditionalPermission(permissions, this) .subscribe(new MaybeObserver<LoginResult>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(LoginResult loginResult) { Log.d(TAG, "onSuccess"); // verify if permission was granted if (loginResult.getRecentlyDeniedPermissions() .contains(PermissionHelper.USER_PHOTOS.getValue())) { // permission was refused, show a toast : Toast.makeText(getApplicationContext(), "We cannot get your photos " + "without your permissions", Toast.LENGTH_LONG).show(); } else { // permission was granted, get albums getPhotos(); } } @Override public void onError(Throwable e) { Log.d(TAG, "onError " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
private void requestAdditionalPermission(List<PermissionHelper> permissions) { ReactiveLogin.requestAdditionalPermission(permissions, this) .subscribe(new MaybeObserver<LoginResult>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(LoginResult loginResult) { Log.d(TAG, "onSuccess"); // verify if permission was granted if (loginResult.getRecentlyDeniedPermissions() .contains(PermissionHelper.USER_PHOTOS.getValue())) { // permission was refused, show a toast : Toast.makeText(getApplicationContext(), "We cannot get your photos " + "without your permissions", Toast.LENGTH_LONG).show(); } else { // permission was granted, get albums getAlbums(); } } @Override public void onError(Throwable e) { Log.d(TAG, "onError " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
/** * call login returns a MaybeObserver<LoginResult> * onSuccess : returns a LoginResult * onError : returns a FacebookException * onComplete : called when a login terminates with no result, like onCanceled. */ private void login() { ReactiveLogin.login(this).subscribe(new MaybeObserver<LoginResult>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); result.append("onSubscribe"); result.append("\n"); } @Override public void onSuccess(LoginResult value) { Log.d(TAG, "OnSuccess"); result.append("\n"); result.append("token = " + value.getAccessToken().getToken()); result.append("\n"); result.append("granted permissions = " + value.getRecentlyGrantedPermissions().size()); result.append("\n"); result.append("denied permissions = " + value.getRecentlyDeniedPermissions().size()); result.append("\n"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError " + e.getMessage()); result.append("onError"); result.append("\n"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); result.append("onComplete"); result.append("\n"); } }); }
@Test public void shouldNotAffectCircuitBreakerWhenWasDisposedAfterNotPermittedSubscribe() throws Exception { // Given Disposable disposable = mock(Disposable.class); MaybeObserver childObserver = mock(MaybeObserver.class); MaybeObserver decoratedObserver = CircuitBreakerOperator.of(circuitBreaker).apply(childObserver); circuitBreaker.transitionToOpenState(); decoratedObserver.onSubscribe(disposable); // When ((Disposable) decoratedObserver).dispose(); // Then assertNoRegisteredCall(); }
/** * Adapts an Vert.x {@code Handler<AsyncResult<T>>} to an RxJava2 {@link SingleObserver}. * <p> * The returned observer can be subscribed to an {@link Single#subscribe(SingleObserver)}. * * @param handler the handler to adapt * @return the observer */ public static <T> MaybeObserver<T> toObserver(Handler<AsyncResult<T>> handler) { AtomicBoolean completed = new AtomicBoolean(); return new MaybeObserver<T>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onComplete() { if (completed.compareAndSet(false, true)) { handler.handle(io.vertx.core.Future.succeededFuture()); } } @Override public void onSuccess(@NonNull T item) { if (completed.compareAndSet(false, true)) { handler.handle(io.vertx.core.Future.succeededFuture(item)); } } @Override public void onError(Throwable error) { if (completed.compareAndSet(false, true)) { handler.handle(io.vertx.core.Future.failedFuture(error)); } } }; }
@Test public void testToMaybeObserverSuccess() { Future<String> fut = Future.future(); MaybeObserver<String> observer = MaybeHelper.toObserver(fut); Maybe<String> s = Maybe.just("foobar"); s.subscribe(observer); assertTrue(fut.succeeded()); assertSame("foobar", fut.result()); }
@Test public void testToMaybeObserverEmpty() { Future<String> fut = Future.future(); MaybeObserver<String> observer = MaybeHelper.toObserver(fut); Maybe<String> s = Maybe.empty(); s.subscribe(observer); assertTrue(fut.succeeded()); assertNull(fut.result()); }
@Test public void testToMaybeObserverFailure() { Future<String> fut = Future.future(); MaybeObserver<String> observer = MaybeHelper.toObserver(fut); RuntimeException cause = new RuntimeException(); Maybe<String> s = Maybe.error(cause); s.subscribe(observer); assertTrue(fut.failed()); assertSame(cause, fut.cause()); }
ForwardingObserver(MaybeObserver<T> delegate) { this.delegate = delegate; }
@Override protected void subscribeActual(MaybeObserver<? super LoginResult> observer) { mObserver = observer; }
@Override protected void subscribeActual(MaybeObserver<? super T> observer) { delegate.subscribe(observer); }
public static <T> MaybeTransformer<T, T> bind(@NonNull LifecycleOwner lifecycleOwner, @NonNull MaybeObserver<T> observer) { return new LifecycleTransformer<>(lifecycleOwner, new MaybeWithObserver<>(observer)); }
public MaybeWithObserver(@NonNull MaybeObserver<T> observer) { this.observer = observer; }
public static void main(String[] args) { Integer[] numbers = { 1, 2, 13, 34, 12, 10 }; Observable<Integer> source1 = Observable.fromArray(numbers); source1.reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer value1, Integer value2) throws Exception { // TODO Auto-generated method stub // 1, 2, 13, 34, 12, 10 int sum = 0; return value1 + value2; } }).subscribe(new MaybeObserver<Integer>() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("completed2"); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub System.out.println(throwable.getMessage()); } @Override public void onSubscribe(Disposable arg0) { // TODO Auto-generated method stub } @Override public void onSuccess(Integer value) { // TODO Auto-generated method stub System.out.println(value); } }); }
public AttachMaybeObserver(MaybeObserver<? super T> actual, CompositeDisposable compositeDisposable) { this.actual = actual; this.compositeDisposable = compositeDisposable; }
public MaybeTaskCallback(Task<?> task, MaybeObserver<? super T> observer) { super(task); this.observer = observer; }
AutoDisposingMaybeObserverImpl(Maybe<?> lifecycle, MaybeObserver<? super T> delegate) { this.lifecycle = lifecycle; this.delegate = delegate; }
@Override public MaybeObserver<? super T> delegateObserver() { return delegate; }
@Override protected void subscribeActual(MaybeObserver<? super T> observer) { source.subscribe(new AutoDisposingMaybeObserverImpl<>(scope, observer)); }
@Override protected void subscribeActual(MaybeObserver<? super T> observer) { SourceSingleSubscriber<T> parent = new SourceSingleSubscriber<T>(observer); observer.onSubscribe(parent); source.subscribe(parent); }