public void onChange(T observer, @Nullable OrderedCollectionChangeSet changes) { if (listener instanceof OrderedRealmCollectionChangeListener) { //noinspection unchecked ((OrderedRealmCollectionChangeListener<T>) listener).onChange(observer, changes); } else if (listener instanceof RealmChangeListener) { //noinspection unchecked ((RealmChangeListener<T>) listener).onChange(observer); } else { throw new RuntimeException("Unsupported listener type: " + listener); } }
@Override public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); resultsRefs.get().acquireReference(results); final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() { @Override public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet)); } } }; results.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { results.removeChangeListener(listener); observableRealm.close(); resultsRefs.get().releaseReference(results); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(results, null)); } }); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(DynamicRealm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
@Override public void onChange(T collection, @Nullable OrderedCollectionChangeSet changes) { listener.onChange(collection); }
Callback(@Nullable OrderedCollectionChangeSet changeSet) { this.changeSet = changeSet; }
ManagedChangeSet(RealmResults<T> realmResults, OrderedCollectionChangeSet orderedCollectionChangeSet) { this.realmResults = realmResults; this.orderedCollectionChangeSet = orderedCollectionChangeSet; }
@Override public void onChange(@NonNull RealmResults<T> realmResults, @Nullable OrderedCollectionChangeSet changeSet) { postValue(new Monarchy.ManagedChangeSet<>(realmResults, changeSet)); }
/** * Constructor for a CollectionChange. * * @param collection the collection that changed. * @param changeset the changeset describing the change. */ public CollectionChange(E collection, @Nullable OrderedCollectionChangeSet changeset) { this.collection = collection; this.changeset = changeset; }
/** * Returns the changeset describing the update. * <p> * This will be {@code null} the first time the stream emits the collection as well as when a asynchronous query * is loaded for the first time. * <p> * <pre> * {@code * // Example * realm.where(Person.class).findAllAsync().asChangesetObservable() * .subscribe(new Consumer<CollectionChange>() { * \@Override * public void accept(CollectionChange item) throws Exception { * item.getChangeset(); // Will return null the first two times * } * }); * } * </pre> * * @return the changeset describing how the collection was updated. */ @Nullable public OrderedCollectionChangeSet getChangeset() { return changeset; }
/** * Gets the ordered collection change set. * * @return the change set */ @Nullable public OrderedCollectionChangeSet getOrderedCollectionChangeSet() { return orderedCollectionChangeSet; }