@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }
private Disposable disposeInUiThread(final Action action) { return Disposables.fromAction(new Action() { @Override public void run() throws Exception { if (Looper.getMainLooper() == Looper.myLooper()) { action.run(); } else { final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker(); inner.schedule(new Runnable() { @Override public void run() { try { action.run(); } catch (Exception e) { onError("Could not unregister receiver in UI Thread", e); } inner.dispose(); } }); } } }); }
@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(run); executeRunnable(title, delay, unit, scheduled); // Re-check disposed state for removing in case we were racing a // call to dispose(). if (disposed) { return Disposables.disposed(); } return scheduled; }
@Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(run); executeRunnable(display, delay, unit, scheduled); // Re-check disposed state for removing in case we were racing a // call to dispose(). if (disposed) { return Disposables.disposed(); } return scheduled; }
@Override public void subscribe(final ObservableEmitter<DocumentSnapshot> emitter) throws Exception { final EventListener<DocumentSnapshot> listener = new EventListener<DocumentSnapshot>() { @Override public void onEvent(DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) { if (!emitter.isDisposed()) { if (e == null) { emitter.onNext(documentSnapshot); } else { emitter.onError(e); } } } }; registration = documentReference.addSnapshotListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { registration.remove(); } })); }
public void openDatabase() { disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> { final Realm observableRealm = Realm.getDefaultInstance(); final RealmChangeListener<Realm> listener = realm -> { if(!emitter.isDisposed()) { emitter.onNext(observableRealm); } }; observableRealm.addChangeListener(listener); emitter.setDisposable(Disposables.fromAction(() -> { observableRealm.removeChangeListener(listener); observableRealm.close(); })); emitter.onNext(observableRealm); }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe(); }
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) { return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> { Realm realm = Realm.getDefaultInstance(); final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm); final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> { if(element.isLoaded() && !emitter.isDisposed()) { List<Task> tasks = mapFrom(element); if(!emitter.isDisposed()) { emitter.onNext(tasks); } } }; emitter.setDisposable(Disposables.fromAction(() -> { if(dbTasks.isValid()) { dbTasks.removeChangeListener(realmChangeListener); } realm.close(); })); dbTasks.addChangeListener(realmChangeListener); }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()); }
/** * Disposes an action in UI Thread * * @param dispose action to be executed * @return Disposable object */ private Disposable disposeInUiThread(final Action dispose) { return Disposables.fromAction(new Action() { @Override public void run() throws Exception { if (Looper.getMainLooper() == Looper.myLooper()) { dispose.run(); } else { final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker(); inner.schedule(new Runnable() { @Override public void run() { try { dispose.run(); } catch (Exception exception) { onError("Could not unregister receiver in UI Thread", exception); } inner.dispose(); } }); } } }); }
@Override public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) { final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() { @Override public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) { if (!emitter.isDisposed()) { emitter.onNext(firebaseAuth); } } }; instance.addAuthStateListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { instance.removeAuthStateListener(listener); } })); }
@Override public void subscribe(final ObservableEmitter<DataSnapshot> emitter) { final ValueEventListener listener = new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { if (!emitter.isDisposed()) { emitter.onNext(dataSnapshot); } } @Override public void onCancelled(DatabaseError databaseError) { if (!emitter.isDisposed()) { emitter.onError(databaseError.toException()); } } }; ref.addValueEventListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { ref.removeEventListener(listener); } })); }
@Override public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) { if (innerSubscription.isDisposed()) { return Disposables.empty(); } final ScheduledAction scheduledAction = new ScheduledAction(action, operationQueue); final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance(); Future<?> future; if (delayTime <= 0) { future = executor.submit(scheduledAction); } else { future = executor.schedule(scheduledAction, delayTime, unit); } scheduledAction.add(Disposables.fromFuture(future)); scheduledAction.addParent(innerSubscription); return scheduledAction; }
@Test public void foundWithUnconditionalOnCompleteAfter() { new Observable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> s) { s.onSubscribe(Disposables.empty()); s.onNext(10); s.onComplete(); } } .compose(ObservableTransformers.indexOf(new Predicate<Integer>() { @Override public boolean test(Integer v) throws Exception { return v == 10; } })) .test() .assertResult(0L); }
public SingleResponseReceiver(ClientCall<?, RespT> call) { this.call = call; this.source = new SingleSource<RespT>() { @Override public void subscribe(SingleObserver<? super RespT> observer) { responseObserver = observer; // todo which disposable should be used here observer.onSubscribe(Disposables.disposed()); // start call until response gets subscribed startCall(); if (error != null) { responseObserver.onError(error); error = null; } } }; }
/** * @param emitter */ @Override public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) { final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() { @Override public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) { if (!emitter.isDisposed()) { emitter.onNext(firebaseAuth); } } }; instance.addAuthStateListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { instance.removeAuthStateListener(listener); } })); }
@Override public void subscribe(ObservableEmitter<Intent> emitter) throws Exception { BroadcastReceiver broadcastReceiver = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { emitter.onNext(intent); } }; emitter.setDisposable(Disposables.fromRunnable(() -> { // thank you Jake W. try { if (contextWeakReference != null && contextWeakReference.get() != null) { contextWeakReference.get().unregisterReceiver(broadcastReceiver); } } catch (IllegalArgumentException ignored) {} })); if (contextWeakReference != null && contextWeakReference.get() != null) { contextWeakReference.get().registerReceiver(broadcastReceiver, intentFilter); } }
@Override public <T> Disposable subscribeDisposable(ListenableFuture<? extends T> future, RxListener<T> untracedListener) { requireNonNull(untracedListener); RxListener<T> listener = Rx.getTracingPolicy().hook(future, untracedListener); // when we're unsubscribed, set the flag to false Disposable sub = Disposables.empty(); // add a callback that guards on whether it is still subscribed future.addListener(() -> { try { T value = future.get(); if (!sub.isDisposed()) { listener.onSuccess(value); } } catch (Throwable error) { if (!sub.isDisposed()) { listener.onFailure(error); } } }, executor); // return the subscription return sub; }
@Override public <T> Disposable subscribeDisposable(CompletionStage<? extends T> future, RxListener<T> untracedListener) { requireNonNull(untracedListener); RxListener<T> listener = Rx.getTracingPolicy().hook(future, untracedListener); // when we're unsubscribed, set the flag to false Disposable sub = Disposables.empty(); future.whenCompleteAsync((value, exception) -> { if (!sub.isDisposed()) { if (exception == null) { listener.onSuccess(value); } else { listener.onFailure(exception); } } }, executor); // return the subscription return sub; }
@SuppressLint("CommitPrefEdits") public PrefInternal(final SharedPreferences pref) { this.pref = pref; editor = pref.edit(); mObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { final SharedPreferences.OnSharedPreferenceChangeListener listener = new SharedPreferences.OnSharedPreferenceChangeListener() { @Override public void onSharedPreferenceChanged(SharedPreferences sharedPreferences, String key) { emitter.onNext(key); } }; pref.registerOnSharedPreferenceChangeListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { log.d("Un-registering PrefCompat"); pref.unregisterOnSharedPreferenceChangeListener(listener); } })); } }); }
@Override public Disposable schedule(Runnable action) { if (unsubscribed) { return Disposables.disposed(); } SwtScheduledAction a = new SwtScheduledAction(action, this); synchronized (this) { if (unsubscribed) { return Disposables.disposed(); } tasks.add(a); } exec.execute(a); if (unsubscribed) { a.cancel(); return Disposables.disposed(); } return a; }
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) { return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() { @Override public void subscribe(FlowableEmitter<RealmResults<T>> emitter) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmQuery<T> query = realm.where(clazz); if (map != null) { for (Map.Entry<String, String> entry : map.entrySet()) { query.equalTo(entry.getKey(), entry.getValue()); } } RealmResults<T> results = query.findAll(); final RealmChangeListener<RealmResults<T>> listener = _realm -> { if (!emitter.isCancelled()) { emitter.onNext(results); } }; emitter.setDisposable(Disposables.fromRunnable(() -> { results.removeChangeListener(listener); realm.close(); })); results.addChangeListener(listener); emitter.onNext(results); } }, BackpressureStrategy.LATEST); }
@Override public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); resultsRefs.get().acquireReference(results); final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() { @Override public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet)); } } }; results.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { results.removeChangeListener(listener); observableRealm.close(); resultsRefs.get().releaseReference(results); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(results, null)); } }); }
@Override public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
@Override public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(DynamicRealm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
@Override public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<E>() { @Override public void subscribe(final FlowableEmitter<E> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmChangeListener<E> listener = new RealmChangeListener<E>() { @Override public void onChange(E obj) { if (!emitter.isCancelled()) { emitter.onNext(obj); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(object); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E extends RealmModel> Observable<ObjectChange<E>> changesetsFrom(Realm realm, final E object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<ObjectChange<E>>() { @Override public void subscribe(final ObservableEmitter<ObjectChange<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmObjectChangeListener<E> listener = new RealmObjectChangeListener<E>() { @Override public void onChange(E obj, ObjectChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new ObjectChange<>(obj, changeSet)); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(new ObjectChange<>(object, null)); } }); }
@Override public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() { @Override public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() { @Override public void onChange(DynamicRealmObject obj) { if (!emitter.isCancelled()) { emitter.onNext(obj); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(object); } }, BACK_PRESSURE_STRATEGY); }
@Override public Observable<ObjectChange<DynamicRealmObject>> changesetsFrom(DynamicRealm realm, final DynamicRealmObject object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<ObjectChange<DynamicRealmObject>>() { @Override public void subscribe(final ObservableEmitter<ObjectChange<DynamicRealmObject>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmObjectChangeListener<DynamicRealmObject> listener = new RealmObjectChangeListener<DynamicRealmObject>() { @Override public void onChange(DynamicRealmObject obj, ObjectChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new ObjectChange<>(obj, changeSet)); } } }; object.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { object.removeChangeListener(listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(new ObjectChange<>(object, null)); } }); }
private void setDisposable(ObservableEmitter<T> emitter) { emitter.setDisposable(Disposables.fromAction(() -> { synchronized (this) { emitters.remove(emitter); } })); }
static boolean checkMainThread(Observer<?> observer) { if (Looper.myLooper() != Looper.getMainLooper()) { observer.onSubscribe(Disposables.empty()); observer.onError(new IllegalStateException( "Expected to be called on the main thread but was " + Thread.currentThread().getName())); return false; } return true; }
@Override public void subscribe(final ObservableEmitter<QuerySnapshot> emitter) throws Exception { final EventListener<QuerySnapshot> listener = new EventListener<QuerySnapshot>() { @Override public void onEvent(QuerySnapshot querySnapshot, FirebaseFirestoreException e) { if (!emitter.isDisposed()) { if (e == null) { emitter.onNext(querySnapshot); } else { emitter.onError(e); } } } }; registration = query.addSnapshotListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { registration.remove(); } })); }
@Override public void subscribe(final ObservableEmitter<DocumentChange> emitter) throws Exception { final EventListener<QuerySnapshot> listener = new EventListener<QuerySnapshot>() { @Override public void onEvent(QuerySnapshot querySnapshot, FirebaseFirestoreException e) { if (!emitter.isDisposed()) { if (e == null) { for (DocumentChange change : querySnapshot.getDocumentChanges()) { emitter.onNext(change); } } else { emitter.onError(e); } } } }; registration = query.addSnapshotListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { registration.remove(); } })); }
@Override public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (delay <= 0) { if (canRunImmediately()) { run.run(); return Disposables.disposed(); } return new SingleDisposableTask(run); } return getGroup(delay, unit).add(new QueuedWork(run)); }
public static boolean checkMainThread(Observer<?> observer) { if (Looper.myLooper() != Looper.getMainLooper()) { observer.onSubscribe(Disposables.empty()); observer.onError(new IllegalStateException( "Expected to be called on the main thread but was " + Thread.currentThread() .getName())); return false; } return true; }
public static boolean checkLooperThread(final Observer observer) { if (Looper.myLooper() == null) { observer.onSubscribe(Disposables.empty()); observer.onError(new IllegalStateException("Calling thread is not associated with Looper")); return false; } else { return true; } }
@Override public void cancel() { if (disposable.compareAndSet(null, Disposables.disposed())) { return; } else { disposable.get().dispose(); // clear for GC disposable.set(Disposables.disposed()); } }