Java 类io.reactivex.disposables.Disposables 实例源码
项目:GitHub
文件:HandlerScheduler.java
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
项目:Rx_java2_soussidev
文件:PreLollipopNetworkObservingStrategy.java
private Disposable disposeInUiThread(final Action action) {
return Disposables.fromAction(new Action() {
@Override public void run() throws Exception {
if (Looper.getMainLooper() == Looper.myLooper()) {
action.run();
} else {
final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
inner.schedule(new Runnable() {
@Override public void run() {
try {
action.run();
} catch (Exception e) {
onError("Could not unregister receiver in UI Thread", e);
}
inner.dispose();
}
});
}
}
});
}
项目:atlas
文件:HandlerScheduler.java
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
项目:RxSWT
文件:EclipseScheduler.java
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null)
throw new NullPointerException("run == null");
if (unit == null)
throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(run);
executeRunnable(title, delay, unit, scheduled);
// Re-check disposed state for removing in case we were racing a
// call to dispose().
if (disposed) {
return Disposables.disposed();
}
return scheduled;
}
项目:RxSWT
文件:DisplayScheduler.java
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null)
throw new NullPointerException("run == null");
if (unit == null)
throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(run);
executeRunnable(display, delay, unit, scheduled);
// Re-check disposed state for removing in case we were racing a
// call to dispose().
if (disposed) {
return Disposables.disposed();
}
return scheduled;
}
项目:RxFirestore
文件:DocumentSnapshotsOnSubscribe.java
@Override
public void subscribe(final ObservableEmitter<DocumentSnapshot> emitter) throws Exception {
final EventListener<DocumentSnapshot> listener = new EventListener<DocumentSnapshot>() {
@Override
public void onEvent(DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) {
if (!emitter.isDisposed()) {
if (e == null) {
emitter.onNext(documentSnapshot);
} else {
emitter.onError(e);
}
}
}
};
registration = documentReference.addSnapshotListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
registration.remove();
}
}));
}
项目:simple-stack
文件:DatabaseManager.java
public void openDatabase() {
disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmChangeListener<Realm> listener = realm -> {
if(!emitter.isDisposed()) {
emitter.onNext(observableRealm);
}
};
observableRealm.addChangeListener(listener);
emitter.setDisposable(Disposables.fromAction(() -> {
observableRealm.removeChangeListener(listener);
observableRealm.close();
}));
emitter.onNext(observableRealm);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
项目:simple-stack
文件:TaskRepository.java
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
Realm realm = Realm.getDefaultInstance();
final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
if(element.isLoaded() && !emitter.isDisposed()) {
List<Task> tasks = mapFrom(element);
if(!emitter.isDisposed()) {
emitter.onNext(tasks);
}
}
};
emitter.setDisposable(Disposables.fromAction(() -> {
if(dbTasks.isValid()) {
dbTasks.removeChangeListener(realmChangeListener);
}
realm.close();
}));
dbTasks.addChangeListener(realmChangeListener);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
项目:simple-stack
文件:DatabaseManager.java
public void openDatabase() {
disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmChangeListener<Realm> listener = realm -> {
if(!emitter.isDisposed()) {
emitter.onNext(observableRealm);
}
};
observableRealm.addChangeListener(listener);
emitter.setDisposable(Disposables.fromAction(() -> {
observableRealm.removeChangeListener(listener);
observableRealm.close();
}));
emitter.onNext(observableRealm);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
项目:simple-stack
文件:TaskRepository.java
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
Realm realm = Realm.getDefaultInstance();
final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
if(element.isLoaded() && !emitter.isDisposed()) {
List<Task> tasks = mapFrom(element);
if(!emitter.isDisposed()) {
emitter.onNext(tasks);
}
}
};
emitter.setDisposable(Disposables.fromAction(() -> {
if(dbTasks.isValid()) {
dbTasks.removeChangeListener(realmChangeListener);
}
realm.close();
}));
dbTasks.addChangeListener(realmChangeListener);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
项目:ReactiveAirplaneMode
文件:ReactiveAirplaneMode.java
/**
* Disposes an action in UI Thread
*
* @param dispose action to be executed
* @return Disposable object
*/
private Disposable disposeInUiThread(final Action dispose) {
return Disposables.fromAction(new Action() {
@Override public void run() throws Exception {
if (Looper.getMainLooper() == Looper.myLooper()) {
dispose.run();
} else {
final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
inner.schedule(new Runnable() {
@Override public void run() {
try {
dispose.run();
} catch (Exception exception) {
onError("Could not unregister receiver in UI Thread", exception);
}
inner.dispose();
}
});
}
}
});
}
项目:RxFirebase2
文件:AuthStateChangesOnSubscribe.java
@Override public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
@Override public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
if (!emitter.isDisposed()) {
emitter.onNext(firebaseAuth);
}
}
};
instance.addAuthStateListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override public void run() throws Exception {
instance.removeAuthStateListener(listener);
}
}));
}
项目:RxFirebase2
文件:DataChangesOnSubscribe.java
@Override public void subscribe(final ObservableEmitter<DataSnapshot> emitter) {
final ValueEventListener listener = new ValueEventListener() {
@Override public void onDataChange(DataSnapshot dataSnapshot) {
if (!emitter.isDisposed()) {
emitter.onNext(dataSnapshot);
}
}
@Override public void onCancelled(DatabaseError databaseError) {
if (!emitter.isDisposed()) {
emitter.onError(databaseError.toException());
}
}
};
ref.addValueEventListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override public void run() throws Exception {
ref.removeEventListener(listener);
}
}));
}
项目:RxiOSMOE
文件:HandlerThreadScheduler.java
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
if (innerSubscription.isDisposed()) {
return Disposables.empty();
}
final ScheduledAction scheduledAction = new ScheduledAction(action, operationQueue);
final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance();
Future<?> future;
if (delayTime <= 0) {
future = executor.submit(scheduledAction);
} else {
future = executor.schedule(scheduledAction, delayTime, unit);
}
scheduledAction.add(Disposables.fromFuture(future));
scheduledAction.addParent(innerSubscription);
return scheduledAction;
}
项目:RxJava2Extensions
文件:ObservableIndexOfTest.java
@Test
public void foundWithUnconditionalOnCompleteAfter() {
new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> s) {
s.onSubscribe(Disposables.empty());
s.onNext(10);
s.onComplete();
}
}
.compose(ObservableTransformers.indexOf(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
return v == 10;
}
}))
.test()
.assertResult(0L);
}
项目:grpc-rx
文件:ClientCallsRx.java
public SingleResponseReceiver(ClientCall<?, RespT> call) {
this.call = call;
this.source = new SingleSource<RespT>() {
@Override
public void subscribe(SingleObserver<? super RespT> observer) {
responseObserver = observer;
// todo which disposable should be used here
observer.onSubscribe(Disposables.disposed());
// start call until response gets subscribed
startCall();
if (error != null) {
responseObserver.onError(error);
error = null;
}
}
};
}
项目:rxfirebase
文件:AuthStateChangesOnSubscribe.java
/**
* @param emitter
*/
@Override
public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
@Override
public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
if (!emitter.isDisposed()) {
emitter.onNext(firebaseAuth);
}
}
};
instance.addAuthStateListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
instance.removeAuthStateListener(listener);
}
}));
}
项目:buseta
文件:RxBroadcastReceiver.java
@Override
public void subscribe(ObservableEmitter<Intent> emitter) throws Exception {
BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
emitter.onNext(intent);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> { // thank you Jake W.
try {
if (contextWeakReference != null && contextWeakReference.get() != null) {
contextWeakReference.get().unregisterReceiver(broadcastReceiver);
}
} catch (IllegalArgumentException ignored) {}
}));
if (contextWeakReference != null && contextWeakReference.get() != null) {
contextWeakReference.get().registerReceiver(broadcastReceiver, intentFilter);
}
}
项目:durian-rx
文件:RxExecutor.java
@Override
public <T> Disposable subscribeDisposable(ListenableFuture<? extends T> future, RxListener<T> untracedListener) {
requireNonNull(untracedListener);
RxListener<T> listener = Rx.getTracingPolicy().hook(future, untracedListener);
// when we're unsubscribed, set the flag to false
Disposable sub = Disposables.empty();
// add a callback that guards on whether it is still subscribed
future.addListener(() -> {
try {
T value = future.get();
if (!sub.isDisposed()) {
listener.onSuccess(value);
}
} catch (Throwable error) {
if (!sub.isDisposed()) {
listener.onFailure(error);
}
}
}, executor);
// return the subscription
return sub;
}
项目:durian-rx
文件:RxExecutor.java
@Override
public <T> Disposable subscribeDisposable(CompletionStage<? extends T> future, RxListener<T> untracedListener) {
requireNonNull(untracedListener);
RxListener<T> listener = Rx.getTracingPolicy().hook(future, untracedListener);
// when we're unsubscribed, set the flag to false
Disposable sub = Disposables.empty();
future.whenCompleteAsync((value, exception) -> {
if (!sub.isDisposed()) {
if (exception == null) {
listener.onSuccess(value);
} else {
listener.onFailure(exception);
}
}
}, executor);
// return the subscription
return sub;
}
项目:PrefCompat
文件:PrefInternal.java
@SuppressLint("CommitPrefEdits") public PrefInternal(final SharedPreferences pref) {
this.pref = pref;
editor = pref.edit();
mObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
final SharedPreferences.OnSharedPreferenceChangeListener listener =
new SharedPreferences.OnSharedPreferenceChangeListener() {
@Override public void onSharedPreferenceChanged(SharedPreferences sharedPreferences,
String key) {
emitter.onNext(key);
}
};
pref.registerOnSharedPreferenceChangeListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override public void run() throws Exception {
log.d("Un-registering PrefCompat");
pref.unregisterOnSharedPreferenceChangeListener(listener);
}
}));
}
});
}
项目:durian-swt
文件:SwtExec.java
@Override
public Disposable schedule(Runnable action) {
if (unsubscribed) {
return Disposables.disposed();
}
SwtScheduledAction a = new SwtScheduledAction(action, this);
synchronized (this) {
if (unsubscribed) {
return Disposables.disposed();
}
tasks.add(a);
}
exec.execute(a);
if (unsubscribed) {
a.cancel();
return Disposables.disposed();
}
return a;
}
项目:GitHub
文件:RealmHelper.java
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) {
return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() {
@Override
public void subscribe(FlowableEmitter<RealmResults<T>> emitter)
throws Exception {
Realm realm = Realm.getDefaultInstance();
RealmQuery<T> query = realm.where(clazz);
if (map != null) {
for (Map.Entry<String, String> entry : map.entrySet()) {
query.equalTo(entry.getKey(), entry.getValue());
}
}
RealmResults<T> results = query.findAll();
final RealmChangeListener<RealmResults<T>> listener = _realm -> {
if (!emitter.isCancelled()) {
emitter.onNext(results);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
results.removeChangeListener(listener);
realm.close();
}));
results.addChangeListener(listener);
emitter.onNext(results);
}
}, BackpressureStrategy.LATEST);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() {
@Override
public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
resultsRefs.get().acquireReference(results);
final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() {
@Override
public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet));
}
}
};
results.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
results.removeChangeListener(listener);
observableRealm.close();
resultsRefs.get().releaseReference(results);
}
}));
// Emit current value immediately
emitter.onNext(new CollectionChange<>(results, null));
}
});
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
@Override
public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results) {
if (!emitter.isCancelled()) {
emitter.onNext(list);
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(list);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() {
@Override
public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new CollectionChange<>(results, changeSet));
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(new CollectionChange<>(list, null));
}
});
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
@Override
public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results) {
if (!emitter.isCancelled()) {
emitter.onNext(list);
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(list);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(DynamicRealm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() {
@Override
public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new CollectionChange<>(results, changeSet));
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(new CollectionChange<>(list, null));
}
});
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<E>() {
@Override
public void subscribe(final FlowableEmitter<E> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
objectRefs.get().acquireReference(object);
final RealmChangeListener<E> listener = new RealmChangeListener<E>() {
@Override
public void onChange(E obj) {
if (!emitter.isCancelled()) {
emitter.onNext(obj);
}
}
};
RealmObject.addChangeListener(object, listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
RealmObject.removeChangeListener(object, listener);
observableRealm.close();
objectRefs.get().releaseReference(object);
}
}));
// Emit current value immediately
emitter.onNext(object);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E extends RealmModel> Observable<ObjectChange<E>> changesetsFrom(Realm realm, final E object) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<ObjectChange<E>>() {
@Override
public void subscribe(final ObservableEmitter<ObjectChange<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
objectRefs.get().acquireReference(object);
final RealmObjectChangeListener<E> listener = new RealmObjectChangeListener<E>() {
@Override
public void onChange(E obj, ObjectChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new ObjectChange<>(obj, changeSet));
}
}
};
RealmObject.addChangeListener(object, listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
RealmObject.removeChangeListener(object, listener);
observableRealm.close();
objectRefs.get().releaseReference(object);
}
}));
// Emit current value immediately
emitter.onNext(new ObjectChange<>(object, null));
}
});
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() {
@Override
public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
objectRefs.get().acquireReference(object);
final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() {
@Override
public void onChange(DynamicRealmObject obj) {
if (!emitter.isCancelled()) {
emitter.onNext(obj);
}
}
};
RealmObject.addChangeListener(object, listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
RealmObject.removeChangeListener(object, listener);
observableRealm.close();
objectRefs.get().releaseReference(object);
}
}));
// Emit current value immediately
emitter.onNext(object);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public Observable<ObjectChange<DynamicRealmObject>> changesetsFrom(DynamicRealm realm, final DynamicRealmObject object) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<ObjectChange<DynamicRealmObject>>() {
@Override
public void subscribe(final ObservableEmitter<ObjectChange<DynamicRealmObject>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
objectRefs.get().acquireReference(object);
final RealmObjectChangeListener<DynamicRealmObject> listener = new RealmObjectChangeListener<DynamicRealmObject>() {
@Override
public void onChange(DynamicRealmObject obj, ObjectChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new ObjectChange<>(obj, changeSet));
}
}
};
object.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
object.removeChangeListener(listener);
observableRealm.close();
objectRefs.get().releaseReference(object);
}
}));
// Emit current value immediately
emitter.onNext(new ObjectChange<>(object, null));
}
});
}
项目:rxstate
文件:RxState.java
private void setDisposable(ObservableEmitter<T> emitter) {
emitter.setDisposable(Disposables.fromAction(() -> {
synchronized (this) {
emitters.remove(emitter);
}
}));
}
项目:CameraButton
文件:Preconditions.java
static boolean checkMainThread(Observer<?> observer) {
if (Looper.myLooper() != Looper.getMainLooper()) {
observer.onSubscribe(Disposables.empty());
observer.onError(new IllegalStateException(
"Expected to be called on the main thread but was " + Thread.currentThread().getName()));
return false;
}
return true;
}
项目:RxFirestore
文件:QuerySnapshotsOnSubscribe.java
@Override
public void subscribe(final ObservableEmitter<QuerySnapshot> emitter) throws Exception {
final EventListener<QuerySnapshot> listener = new EventListener<QuerySnapshot>() {
@Override
public void onEvent(QuerySnapshot querySnapshot, FirebaseFirestoreException e) {
if (!emitter.isDisposed()) {
if (e == null) {
emitter.onNext(querySnapshot);
} else {
emitter.onError(e);
}
}
}
};
registration = query.addSnapshotListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
registration.remove();
}
}));
}
项目:RxFirestore
文件:DocumentChangesOnSubscribe.java
@Override
public void subscribe(final ObservableEmitter<DocumentChange> emitter) throws Exception {
final EventListener<QuerySnapshot> listener = new EventListener<QuerySnapshot>() {
@Override
public void onEvent(QuerySnapshot querySnapshot, FirebaseFirestoreException e) {
if (!emitter.isDisposed()) {
if (e == null) {
for (DocumentChange change : querySnapshot.getDocumentChanges()) {
emitter.onNext(change);
}
} else {
emitter.onError(e);
}
}
}
};
registration = query.addSnapshotListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
registration.remove();
}
}));
}
项目:pl
文件:BukkitWorker.java
@Override
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (delay <= 0) {
if (canRunImmediately()) {
run.run();
return Disposables.disposed();
}
return new SingleDisposableTask(run);
}
return getGroup(delay, unit).add(new QueuedWork(run));
}
项目:RxTask
文件:Preconditions.java
public static boolean checkMainThread(Observer<?> observer) {
if (Looper.myLooper() != Looper.getMainLooper()) {
observer.onSubscribe(Disposables.empty());
observer.onError(new IllegalStateException(
"Expected to be called on the main thread but was " + Thread.currentThread()
.getName()));
return false;
}
return true;
}
项目:RxBroadcastReceiver
文件:Preconditions.java
public static boolean checkLooperThread(final Observer observer) {
if (Looper.myLooper() == null) {
observer.onSubscribe(Disposables.empty());
observer.onError(new IllegalStateException("Calling thread is not associated with Looper"));
return false;
} else {
return true;
}
}
项目:rxjava2-jdbc
文件:FlowableSingleDeferUntilRequest.java
@Override
public void cancel() {
if (disposable.compareAndSet(null, Disposables.disposed())) {
return;
} else {
disposable.get().dispose();
// clear for GC
disposable.set(Disposables.disposed());
}
}