/** * Using the returned Observable, you can be notified about data changes. * Once a transaction is committed, you will get info on classes with changed Objects. */ public static <T> Observable<Class> observable(final BoxStore boxStore) { return Observable.create(new ObservableOnSubscribe<Class>() { @Override public void subscribe(final ObservableEmitter<Class> emitter) throws Exception { final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() { @Override public void onData(Class data) { if (!emitter.isDisposed()) { emitter.onNext(data); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); } }); }
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) { final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() { @Override public void onData(List<T> data) { for (T datum : data) { if (emitter.isCancelled()) { return; } else { emitter.onNext(datum); } } if (!emitter.isCancelled()) { emitter.onComplete(); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); }
/** * The returned Observable emits Query results as Lists. * Never completes, so you will get updates when underlying data changes. */ public static <T> Observable<List<T>> observable(final Query<T> query) { return Observable.create(new ObservableOnSubscribe<List<T>>() { @Override public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception { final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() { @Override public void onData(List<T> data) { if (!emitter.isDisposed()) { emitter.onNext(data); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); } }); }
/** * The returned Single emits one Query result as a List. */ public static <T> Single<List<T>> single(final Query<T> query) { return Single.create(new SingleOnSubscribe<List<T>>() { @Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception { final DataSubscription dataSubscription = query.subscribe().single().observer(new DataObserver<List<T>>() { @Override public void onData(List<T> data) { if (!emitter.isDisposed()) { emitter.onSuccess(data); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); } }); }
@Override public final void subscribe(FlowableEmitter<T> emitter) throws Exception { final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter)); try { apiClient.connect(); } catch (Throwable ex) { emitter.onError(ex); } emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { if (apiClient.isConnected()) { RxLocationFlowableOnSubscribe.this.onUnsubscribed(apiClient); } apiClient.disconnect(); } }); }
@Override public final void subscribe(MaybeEmitter<T> emitter) throws Exception { final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter)); try { apiClient.connect(); } catch (Throwable ex) { emitter.onError(ex); } emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { if (apiClient.isConnected()) { RxLocationMaybeOnSubscribe.this.onUnsubscribed(apiClient); } apiClient.disconnect(); } }); }
@Override public final void subscribe(SingleEmitter<T> emitter) throws Exception { final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter)); try { apiClient.connect(); } catch (Throwable ex) { emitter.onError(ex); } emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { if (apiClient.isConnected()) { RxLocationSingleOnSubscribe.this.onUnsubscribed(apiClient); } apiClient.disconnect(); } }); }
/** * Yields periodical location updates. * * This observable will never call onComplete() thus manual unsubscribe() is necessary. * * When using setExpirationDuration() or setNumUpdates() or setExpirationTime() the observable * will not terminate automatically and will just stop emitting new items without releasing any * resources. * * @return an Observable that returns Location items. */ @SuppressWarnings("WeakerAccess") // It's an entry point. public static Observable<Location> locationUpdates(final Context context, final LocationRequest locationRequest) { return Observable.create(new ObservableOnSubscribe<Location>() { @Override public void subscribe(ObservableEmitter<Location> e) throws Exception { final LocationUpdatesHelper locationUpdatesHelper = new LocationUpdatesHelper( context, new GoogleApiClientFactoryImpl(), new FusedLocationProviderFactoryImpl(), e, locationRequest); e.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { locationUpdatesHelper.stop(); } }); locationUpdatesHelper.start(); } }); }
/** * Yields the last location available to the system. * * This observable will emit only one element and then call onComplete. * * @return an Observable that returns one Location item. */ @SuppressWarnings("WeakerAccess") // It's an entry point. public static Single<Location> lastLocation(final Context context) { return Observable.create(new ObservableOnSubscribe<Location>() { @Override public void subscribe(ObservableEmitter<Location> e) throws Exception { final LastLocationHelper lastLocationHelper = new LastLocationHelper( context, new GoogleApiClientFactoryImpl(), new FusedLocationProviderFactoryImpl(), e); e.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { lastLocationHelper.stop(); } }); lastLocationHelper.start(); } }).singleOrError(); }
public <T> Observable<T> observe(final String key) { return Observable.create(new ObservableOnSubscribe<T>() { @Override public void subscribe(final ObservableEmitter<T> e) throws Exception { addOnWriteListener(key, new OnWriteListener<T>() { @Override public void onWrite(T object) { if (!e.isDisposed()) { e.onNext(object); } e.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { removeListener(key); } }); } }); } }); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Maybe<Integer> source = Maybe.create(new MaybeOnSubscribe<Integer>() { @Override public void subscribe(MaybeEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Completable source = Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(CompletableEmitter e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java Single<Integer> source = Single.create(new SingleOnSubscribe<Integer>() { @Override public void subscribe(SingleEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); lifecycle.onSuccess(0); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Override public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception { final AppStateListener appStateListener = new AppStateListener() { @Override public void onAppDidEnterForeground() { appStateEmitter.onNext(FOREGROUND); } @Override public void onAppDidEnterBackground() { appStateEmitter.onNext(BACKGROUND); } }; appStateEmitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { recognizer.removeListener(appStateListener); recognizer.stop(); } }); recognizer.addListener(appStateListener); recognizer.start(); }
/** * @param query * @return */ @NonNull @CheckReturnValue public static Single<DataSnapshot> single(@NonNull final Query query) { return Single.create(new SingleOnSubscribe<DataSnapshot>() { @Override public void subscribe( @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception { final ValueEventListener listener = listener(emit); emit.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { query.removeEventListener(listener); } }); query.addListenerForSingleValueEvent(listener); } }); }
@Override @RequiresPermission(USE_FINGERPRINT) @RequiresApi(Build.VERSION_CODES.M) public void subscribe(ObservableEmitter<T> emitter) throws Exception { if (fingerprintApiWrapper.isUnavailable()) { emitter.onError(new FingerprintUnavailableException("Fingerprint authentication is not available on this device! Ensure that the device has a Fingerprint sensor and enrolled Fingerprints by calling RxFingerprint#isAvailable(Context) first")); return; } AuthenticationCallback callback = createAuthenticationCallback(emitter); cancellationSignal = fingerprintApiWrapper.createCancellationSignal(); CryptoObject cryptoObject = initCryptoObject(emitter); //noinspection MissingPermission fingerprintApiWrapper.getFingerprintManager().authenticate(cryptoObject, cancellationSignal, 0, callback, null); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { if (cancellationSignal != null && !cancellationSignal.isCanceled()) { cancellationSignal.cancel(); } } }); }
public void addEmitter(@NonNull final ObservableEmitter<Integer> emitter) { emitterList.add(emitter); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { emitterList.remove(emitter); } }); emitMissingLifecycle(emitter); }
/** * Listener for changes in te data at the given query location. * * @param query reference represents a particular location in your Database and can be used for reading or writing data to that Database location. * @param strategy {@link BackpressureStrategy} associated to this {@link Flowable} * @return a {@link Flowable} which emits when a value of the database change in the given query. */ @NonNull public static Flowable<DataSnapshot> observeValueEvent(@NonNull final Query query, @NonNull BackpressureStrategy strategy) { return Flowable.create(new FlowableOnSubscribe<DataSnapshot>() { @Override public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Exception { final ValueEventListener valueEventListener = new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { emitter.onNext(dataSnapshot); } @Override public void onCancelled(final DatabaseError error) { emitter.onError(new RxFirebaseDataException(error)); } }; emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { query.removeEventListener(valueEventListener); } }); query.addValueEventListener(valueEventListener); } }, strategy); }
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final SensorEventListener sensorEventListener) { subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { sensorManager.unregisterListener(sensorEventListener); } }); }
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final BroadcastReceiver broadcastReceiver){ subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { context.unregisterReceiver(broadcastReceiver); } }); }
@RequiresApi(api = Build.VERSION_CODES.LOLLIPOP) private void addUnsuscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final ScanCallback scanCallback){ final BluetoothLeScanner scanner = bluetoothManager.getAdapter().getBluetoothLeScanner(); subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { scanner.flushPendingScanResults(scanCallback); scanner.stopScan(scanCallback); } }); }
@RequiresApi(Build.VERSION_CODES.N) private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final GnssMeasurementsEvent.Callback callback) { subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { locationManager.unregisterGnssMeasurementsCallback(callback); } }); }
@RequiresApi(Build.VERSION_CODES.N) private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final GnssStatus.Callback callback) { subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { locationManager.unregisterGnssStatusCallback(callback); } }); }
private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final LocationListener locationListener) { subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { locationManager.removeUpdates(locationListener); } }); }
@RequiresApi(Build.VERSION_CODES.N) private void addUnsubscribeCallbackFor(FlowableEmitter<SensorRecord> subscriber, final GnssNavigationMessage.Callback callback) { subscriber.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { locationManager.unregisterGnssNavigationMessageCallback(callback); } }); }
private Observable<String> createButtonClickObservable() { // 2 return Observable.create(new ObservableOnSubscribe<String>() { // 3 @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { // 4 onclicklist = new View.OnClickListener() { @Override public void onClick(View view) { // 5 emitter.onNext(mQueryEditText.getText().toString()); } }; mSearchButton.setOnClickListener(onclicklist); // 6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { // 7 onclicklist = null; mSearchButton.setOnClickListener(null); } }); } }); }
private Observable<String> createButtonClickObservable() { // 2 return Observable.create(new ObservableOnSubscribe<String>() { // 3 @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { // 4 mSearchButton.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { // 5 emitter.onNext(mQueryEditText.getText().toString()); } }); // 6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { // 7 mSearchButton.setOnClickListener(null); } }); } }); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java final ObservableEmitter<Integer>[] emitter = new ObservableEmitter[1]; Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); emitter[0] = e; } }); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); emitter[0].onNext(1); lifecycle.onSuccess(0); emitter[0].onNext(2); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Test public void verifyCancellation() { final AtomicInteger i = new AtomicInteger(); //noinspection unchecked because Java final FlowableEmitter<Integer>[] emitter = new FlowableEmitter[1]; Flowable<Integer> source = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) { e.setCancellable(new Cancellable() { @Override public void cancel() { i.incrementAndGet(); } }); emitter[0] = e; } }, BackpressureStrategy.LATEST); MaybeSubject<Integer> lifecycle = MaybeSubject.create(); source.as(AutoDispose.<Integer>autoDisposable(lifecycle)) .subscribe(); assertThat(i.get()).isEqualTo(0); assertThat(lifecycle.hasObservers()).isTrue(); emitter[0].onNext(1); lifecycle.onSuccess(0); emitter[0].onNext(2); // Verify cancellation was called assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); }
@Override public void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception { final ValueEventListener eventListener = query.addValueEventListener(new RxValueListener<>(emitter, marshaller)); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { query.removeEventListener(eventListener); } }); }
/** * Create an {@link io.reactivex.Observable} that emits a changed property of the target * {@link Observable} when the specified property of the target {@link Observable} changes. * <p> * The created {@link io.reactivex.Observable} never emits {@code onComplete} notification, * so {@link io.reactivex.Observer} must dispose the connection to avoid leak. * * @param target the {@link Observable} to be observed * @param targetPropertyId the property id of the target {@link Observable} * (e.g. BR.some_property) * @param getter the getter function to get the property from the target * {@link Observable} * @param <T> the type of {@code target} * @param <R> the type of the property to be observed * @return an {@link io.reactivex.Observable} that emits a changed property of the target * {@link Observable} when the specified property of the target {@link Observable} changes */ public static <T extends Observable, R> io.reactivex.Observable<R> propertyOf( @NonNull final T target, final int targetPropertyId, @NonNull final Function<T, R> getter ) { return io.reactivex.Observable.create( new ObservableOnSubscribe<R>() { @Override public void subscribe(final ObservableEmitter<R> emitter) throws Exception { final Observable.OnPropertyChangedCallback callback = new Observable.OnPropertyChangedCallback() { @Override public void onPropertyChanged(Observable sender, int propertyId) { try { if (propertyId == targetPropertyId) { emitter.onNext(getter.apply(target)); } } catch (Throwable e) { emitter.onError(e); } } }; target.addOnPropertyChangedCallback(callback); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { target.removeOnPropertyChangedCallback(callback); } }); } } ); }
/** * Create an {@link io.reactivex.Observable} that emits the target {@link Observable} when even * one property of the target {@link Observable} changes. * <p> * The created {@link io.reactivex.Observable} never emits {@code onComplete} notification, * so {@link io.reactivex.Observer} must dispose the connection to avoid leak. * * @param target the {@link Observable} to be observed * @param <T> the type of {@code target} * @return an {@link io.reactivex.Observable} that emits the target {@link Observable} when even * one property of the target {@link Observable} changes */ public static <T extends Observable> io.reactivex.Observable<T> allPropertiesOf( @NonNull final T target ) { return io.reactivex.Observable.create( new ObservableOnSubscribe<T>() { @Override public void subscribe(final ObservableEmitter<T> emitter) throws Exception { final Observable.OnPropertyChangedCallback callback = new Observable.OnPropertyChangedCallback() { @Override public void onPropertyChanged(Observable sender, int propertyId) { try { emitter.onNext(target); } catch (Throwable e) { emitter.onError(e); } } }; target.addOnPropertyChangedCallback(callback); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { target.removeOnPropertyChangedCallback(callback); } }); } } ); }
@BindingAdapter("rxCommandOnClick") public static void setOnClick(final View view, final RxCommand<NoParameter> command) { // Set initial state. view.setEnabled(command.canExecute()); // Observe click events. view.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { if (command.canExecute()) { command.execute(NoParameter.INSTANCE); } } }); // Observe enabled changed events. final Observable.OnPropertyChangedCallback callback = new Observable.OnPropertyChangedCallback() { @Override public void onPropertyChanged(Observable observable, int i) { view.setEnabled(command.canExecute()); } }; command.getEnabled().addOnPropertyChangedCallback(callback); command.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { command.getEnabled().removeOnPropertyChangedCallback(callback); } }); }
public static void safeCancel(@Nullable Cancellable cancellable) { if (cancellable == null) { return; } try { cancellable.cancel(); } catch (Exception e) { // Ignore the exception. } }
@Test public void unbindViewWillBeExecutedWhenDisposed() throws Exception { // given Cancellable mockCancellable = Mockito.mock(Cancellable.class); property.setCancellable(mockCancellable); verify(mockCancellable, never()).cancel(); // when property.dispose(); // then verify(mockCancellable).cancel(); }
@Test public void unbindViewWillBeExecutedWhenDisposed() throws Exception { // given RxCommand<NoParameter> command = new RxCommand<>(); Cancellable mockCancellable = Mockito.mock(Cancellable.class); command.setCancellable(mockCancellable); verify(mockCancellable, never()).cancel(); // when command.dispose(); // then verify(mockCancellable).cancel(); }
@Test public void ignoreExceptionByUnbindView() throws Exception { // given RxCommand<NoParameter> command = new RxCommand<>(); Cancellable mockCancellable = Mockito.mock(Cancellable.class); doThrow(new RuntimeException("Error in unbindView")).when(mockCancellable).cancel(); command.setCancellable(mockCancellable); verify(mockCancellable, never()).cancel(); // when command.dispose(); // then verify(mockCancellable).cancel(); }