private void emmit(FlowableEmitter<Message> emitter, String roomId) throws Exception { SSLContext sslCtx = SSLContext.getDefault(); SSLEngine sslEngine = sslCtx.createSSLEngine("stream.gitter.im", 443); sslEngine.setUseClientMode(true); HttpClient .newClient("stream.gitter.im", 443) .secure(sslEngine) .createGet("/v1/rooms/" + roomId + "/chatMessages") .addHeader("Authorization", "Bearer 3cd4820adf59b6a7116f99d92f68a1b786895ce7") .flatMap(HttpClientResponse::getContent) .filter(bb -> bb.capacity() > 2) .map(MessageEncoder::mapToMessage) .doOnNext(m -> System.out.println("Log Emit: " + m)) .subscribe(emitter::onNext, emitter::onError, emitter::onComplete); }
/** * @param path the root directory containing files to be emitted * @param depth offset, where 1 == data directory, 2 == zoom directory, 3 == row, 4 == column * @return a stream of file references to data structured as the Google tiling scheme. */ public static Observable<File> getTiles(String path, int depth) { // programmer's note: // https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831 // http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/ // Essentially lots of way to skin this cat - e.g. onBackpressureBlock / // reactive pull return Flowable.create(new FlowableOnSubscribe<File>() { @Override public void subscribe(FlowableEmitter<File> subscriber) throws Exception { try { // warning: depth 4 is not really walking (semantics) walk(path, depth, subscriber); if (!subscriber.isCancelled()) { subscriber.onComplete(); } } catch (final Exception ex) { if (!subscriber.isCancelled()) { subscriber.onError(ex); } } } }, BackpressureStrategy.BUFFER).toObservable(); }
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) { final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() { @Override public void onData(List<T> data) { for (T datum : data) { if (emitter.isCancelled()) { return; } else { emitter.onNext(datum); } } if (!emitter.isCancelled()) { emitter.onComplete(); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); }
@Override public final void subscribe(FlowableEmitter<T> emitter) throws Exception { final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter)); try { apiClient.connect(); } catch (Throwable ex) { emitter.onError(ex); } emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { if (apiClient.isConnected()) { RxLocationFlowableOnSubscribe.this.onUnsubscribed(apiClient); } apiClient.disconnect(); } }); }
public static Observable<File> getTiles(String path) { // programmer's note: // https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831 // http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/ // Essentially lots of way to skin this cat - e.g. onBackpressureBlock / // reactive pull return Flowable.create(new FlowableOnSubscribe<File>() { @Override public void subscribe(FlowableEmitter<File> subscriber) throws Exception { try { walk(path, 1, subscriber); if (!subscriber.isCancelled()) { subscriber.onComplete(); } } catch (final Exception ex) { if (!subscriber.isCancelled()) { subscriber.onError(ex); } } } }, BackpressureStrategy.BUFFER).toObservable(); }
/** * 插入到系统相册, 并刷新系统相册 * @param imageUrl */ private static void insertSystemAlbumAndRefresh(final String imageUrl) { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { File file = FileUtils.createFileFrom(imageUrl); String imageUri = MediaStore.Images.Media.insertImage(ApplicationProvider.IMPL.getApp().getContentResolver(), file.getAbsolutePath(), file.getName(), "图片: " + file.getName()); Log.d("_stone_", "insertSystemAlbumAndRefresh-subscribe: imageUri=" + imageUri); //讲数据插入到系统图库, 在系统相册APP中就可以看到保存的图片了. //为了保险起见, 再同步一下系统图库 syncAlbum(imageUrl); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); }
public SimpleRemoteSourceMapper(FlowableEmitter<Resource<T>> emitter) { emitter.onNext(Resource.loading(null)); // since realm instance was created on Main Thread, so if we need to touch on realm database after calling // api (such as save response data to local database, we must make request on main thread // by setting shouldUpdateUi params = true Disposable disposable = RestHelper.makeRequest(getRemote(), true, response -> { Log.d(TAG, "SimpleRemoteSourceMapper: call API success!"); saveCallResult(response); emitter.onNext(Resource.success(response)); }, errorEntity -> { Log.d(TAG, "SimpleRemoteSourceMapper: call API error: " + errorEntity.getMessage()); emitter.onNext(Resource.error(errorEntity.getMessage(), null)); }); // set emitter disposable to ensure that when it is going to be disposed, our api request should be disposed as well emitter.setDisposable(disposable); }
public ListDataNetworkBounceResource(FlowableEmitter<Resource<LocalType>> emitter) { emitter.onNext(Resource.success(getLocal())); emitter.onNext(Resource.loading(null)); // since realm instance was created on Main Thread, so if we need to touch on realm database after calling // api (such as save response data to local database, we must make request on main thread // by setting shouldUpdateUi params = true Disposable disposable = RestHelper.makeRequest(getRemote(), true, response -> { Single.just(response) .map(mapper()) .subscribe(localData -> { Log.d(TAG, "SimpleRemoteSourceMapper: call API success!"); saveCallResult(localData); emitter.onNext(Resource.success(localData)); }); }, errorEntity -> { Log.d(TAG, "SimpleRemoteSourceMapper: call API error: " + errorEntity.getMessage()); emitter.onNext(Resource.error(errorEntity.getMessage(), null)); }); // set emitter disposable to ensure that when it is going to be disposed, our api request should be disposed as well emitter.setDisposable(disposable); }
private BroadcastReceiver defineBroadcastReceiverFor(final FlowableEmitter<SensorRecord> subscriber){ return new BroadcastReceiver() { Timer timer = new Timer(); long prevCallTime = new Date().getTime(); @Override public void onReceive(Context context, Intent intent) { long actualTime = new Date().getTime(); long delay = calculateDiffDelay(prevCallTime, actualTime); prevCallTime = actualTime; subscriber.onNext(new WifiMeasurementsRecord(wifiManager.getScanResults())); if (delay > 0) timer.schedule(createScanTask(), delay); else createScanTask().run(); } }; }
@Override public void subscribe(FlowableEmitter<T> e) throws Exception { try { Response response = call.execute(); if (!e.isCancelled()) { if (response.isSuccessful()) { e.onNext(rxEasyConverter.convert(response.body().string())); } else { e.onError(new Throwable("response is unsuccessful")); } } } catch (Throwable t) { if (!e.isCancelled()) { e.onError(t); } } finally { e.onComplete(); } }
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { Looper.prepare(); try { joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } Looper.loop(); } } , BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); }
@Override public Flowable doPullStreamPost(final IRequestEntity requestEntity, final File file) { Flowable flowable = DefaultFlowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); Gson gson = new Gson(); String json = gson.toJson(requestEntity); RequestBody body = RequestBody.create(JSON, json); Request request = new Request.Builder() .url(mUrl) .post(body) .build(); Response response = mClient.newCall(request).execute(); if (response == null) { emitter.onError(new EmptyException()); return; } recvFile(emitter, response, file); emitter.onComplete(); } }); return flowable; }
@Test public void createYourOwnTibco() throws Exception { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { while (!e.isCancelled()) { long numberRecords = e.requested(); System.out.println(numberRecords); if (numberRecords > 0) { } } } }, BackpressureStrategy.BUFFER); flowable.map(x -> x + "Yay!").subscribe(System.out::println); }
@Override public final void subscribe(FlowableEmitter<T> emitter) throws Exception { final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter)); try { apiClient.connect(); } catch (Throwable ex) { emitter.onError(ex); } emitter.setCancellable(() -> { if (apiClient.isConnected()) { onUnsubscribed(apiClient); } apiClient.disconnect(); }); }
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) { return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() { @Override public void subscribe(FlowableEmitter<RealmResults<T>> emitter) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmQuery<T> query = realm.where(clazz); if (map != null) { for (Map.Entry<String, String> entry : map.entrySet()) { query.equalTo(entry.getKey(), entry.getValue()); } } RealmResults<T> results = query.findAll(); final RealmChangeListener<RealmResults<T>> listener = _realm -> { if (!emitter.isCancelled()) { emitter.onNext(results); } }; emitter.setDisposable(Disposables.fromRunnable(() -> { results.removeChangeListener(listener); realm.close(); })); results.addChangeListener(listener); emitter.onNext(results); } }, BackpressureStrategy.LATEST); }
/** * 生成Flowable * @param <T> * @return */ public static <T> Flowable<T> createData(final T t) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { try { emitter.onNext(t); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER); }
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) { return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() { @Override public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception { record.save(e, response); } }, BackpressureStrategy.LATEST); }
/** * 保存断点下载的文件,以及下载进度 * * @param index 下载编号 * @param response 响应值 * @return Flowable */ private Publisher<DownloadStatus> save(final int index, final ResponseBody response) { Flowable<DownloadStatus> flowable = Flowable.create(new FlowableOnSubscribe<DownloadStatus>() { @Override public void subscribe(FlowableEmitter<DownloadStatus> emitter) throws Exception { record.save(emitter, index, response); } }, BackpressureStrategy.LATEST) .replay(1) .autoConnect(); return flowable.throttleFirst(100, TimeUnit.MILLISECONDS).mergeWith(flowable.takeLast(1)) .subscribeOn(Schedulers.newThread()); }
@Override public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<E>() { @Override public void subscribe(final FlowableEmitter<E> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmChangeListener<E> listener = new RealmChangeListener<E>() { @Override public void onChange(E obj) { if (!emitter.isCancelled()) { emitter.onNext(obj); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(object); } }, BACK_PRESSURE_STRATEGY); }
@Override public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() { @Override public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() { @Override public void onChange(DynamicRealmObject obj) { if (!emitter.isCancelled()) { emitter.onNext(obj); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(object); } }, BACK_PRESSURE_STRATEGY); }
/** * The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called. * Uses given BackpressureStrategy. */ public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(final FlowableEmitter<T> emitter) throws Exception { createListItemEmitter(query, emitter); } }, strategy); }
public void calcIconTotal() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception { XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable); int total = 0; while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) { if (xml.getEventType() == XmlPullParser.START_TAG) { if (xml.getName().startsWith("item")) { total++; } } xml.next(); } flowableEmitter.onNext(total); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mView.setIconTotal(integer); } }); }
private void doRxJavaWork() { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("事件1"); e.onNext("事件2"); e.onNext("事件3"); e.onNext("事件4"); e.onComplete(); } }, BackpressureStrategy.BUFFER); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe: "); s.request(Long.MAX_VALUE); } @Override public void onNext(String string) { Log.d(TAG, "onNext: " + string); } @Override public void onError(Throwable t) { Log.d(TAG, "onError: " + t.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }; flowable.subscribe(subscriber); }
/** * 获取内存缓存请求Flowable对象 * * @return 内存缓存查询Flowable对象 */ private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() { //内存缓存数据获取 return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() { @Override public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception { T response = null; try { if (ccCacheQueryCallback != null) { response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey); } CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false); e.onNext(tccBaseResponse); e.onComplete(); } catch (Exception exception) { switch (cacheQueryMode) { case CCCacheMode.QueryMode.MODE_ONLY_MEMORY: e.onError(new CCDiskCacheQueryException(exception)); break; default: e.onComplete(); break; } } } }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()); }
/** * 获取磁盘缓存请求Flowable对象 * * @return 磁盘缓存查询Flowable对象 */ private Flowable<CCBaseResponse<T>> getDiskCacheQueryFlowable() { //磁盘缓存获取,包括任何形式的磁盘缓存 return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() { @Override public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception { T response = null; try { if (ccCacheQueryCallback != null) { response = ccCacheQueryCallback.<T>onQueryFromDisk(cacheKey); } CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, false, true); e.onNext(tccBaseResponse); e.onComplete(); } catch (Exception exception) { switch (cacheQueryMode) { case CCCacheMode.QueryMode.MODE_ONLY_DISK: case CCCacheMode.QueryMode.MODE_MEMORY_THEN_DISK: e.onError(new CCDiskCacheQueryException(exception)); break; default: e.onComplete(); break; } } } }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()); }
@Override public Flowable<List<Alarm>> getAlarms() { return Flowable.create( new FlowableOnSubscribe<List<Alarm>>() { @Override public void subscribe(FlowableEmitter<List<Alarm>> e) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmQuery<RealmAlarm> query = realm.where(RealmAlarm.class); RealmResults<RealmAlarm> result = query.findAll(); List<Alarm> alarmList = new ArrayList<>(); if (result.size() == 0) { e.onComplete(); } else { for (int i = 0; i < result.size(); i++) { Alarm alarm = new Alarm(); RealmAlarm realmAlarm = result.get(i); alarm.setActive(realmAlarm.isActive()); alarm.setRenewAutomatically(realmAlarm.isRenewAutomatically()); alarm.setVibrateOnly(realmAlarm.isVibrateOnly()); alarm.setHourOfDay(realmAlarm.getHourOfDay()); alarm.setMinute(realmAlarm.getMinute()); alarm.setAlarmTitle(realmAlarm.getAlarmTitle()); alarm.setAlarmId(realmAlarm.getAlarmId()); alarmList.add( alarm ); } e.onNext(alarmList); } } }, BackpressureStrategy.LATEST); }
@Override protected void onGoogleApiClientReady(GoogleApiClient apiClient, final FlowableEmitter<Location> emitter) { locationListener = new LocationListener() { @Override public void onLocationChanged(Location value) { emitter.onNext(value); } }; //noinspection MissingPermission setupLocationPendingResult( LocationServices.FusedLocationApi.requestLocationUpdates(apiClient, locationRequest, locationListener, looper), new StatusErrorResultCallBack(emitter) ); }
/** * 加载数据,刷新视图 **/ private void loadData() { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { reset(); /* io线程加载数据库数据 */ List<AlarmClock> alarmClocks = mAssistDao.findAllAlarmAsc(false); week = Calendar.getInstance().get(Calendar.DAY_OF_WEEK); week = week - 1 == 0 ? 7 : week - 1; //计算当天是星期几 week = week + 1 == 8 ? 1 : week + 1; // 计算明天是星期几 for (AlarmClock alarm : alarmClocks) { TaskCard<AlarmClock> card = new TaskCard<>(alarm, TaskCard.TaskState.ACTIVE); if (alarm.getValid() == 0) card.taskState = TaskCard.TaskState.INVALID; updateAlarmCount(AccountingActivity.TYPE_ADD, alarm); alarmDatas.add(card); } e.onNext(0); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程 .doOnSubscribe(new Consumer<Subscription>() { @Override public void accept(Subscription subscription) throws Exception { mCpbLoad.setVisibility(View.VISIBLE); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程 .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { /* 回到主线程刷新列表 */ mCpbLoad.setVisibility(View.GONE); mAdapter.notifyDataSetChanged(); } }); }
private void flowableTest() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 128; i++) { Log.d(TAG, "emit " + i); emitter.onNext(i); } } }, BackpressureStrategy.ERROR)//增加了一个参数 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); // s.request(Long.MAX_VALUE); //注意这句代码 mSubscription = s; } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
/** * 生成Flowable * * @param t * @return Flowable */ public static <T> Flowable<T> createFlowable(final T t) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { try { emitter.onNext(t); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER); }
@Override public Flowable<List<Song>> getAllSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(SongsPresenter.getAllSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Playlist>> getAllPlaylists() { return Flowable.create(new FlowableOnSubscribe<List<Playlist>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Playlist>> e) throws Exception { e.onNext(PlaylistsPresenter.getAllPlaylists()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<Alarm> getAlarmsById(final String reminderId) { return Flowable.create( new FlowableOnSubscribe<Alarm>() { @Override public void subscribe(FlowableEmitter<Alarm> e) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmQuery<RealmAlarm> query = realm.where(RealmAlarm.class); query.equalTo("alarmId", reminderId); RealmResults<RealmAlarm> result = query.findAll(); if (result.size() == 0) { e.onError(new Exception("AlarmNotFoundException")); } else { RealmAlarm realmAlarm = result.get(0); Alarm alarm = new Alarm(); alarm.setAlarmId(realmAlarm.getAlarmId()); alarm.setActive(realmAlarm.isActive()); alarm.setRenewAutomatically(realmAlarm.isRenewAutomatically()); alarm.setVibrateOnly(realmAlarm.isVibrateOnly()); alarm.setHourOfDay(realmAlarm.getHourOfDay()); alarm.setMinute(realmAlarm.getMinute()); alarm.setAlarmTitle(realmAlarm.getAlarmTitle()); e.onNext(alarm); } } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Album>> getArtistAlbums(final long artistId) { return Flowable.create(new FlowableOnSubscribe<List<Album>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception { e.onNext(AlbumsPresenter.getArtistAlbums(artistId)); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Artist>> getAllArtists() { return Flowable.create(new FlowableOnSubscribe<List<Artist>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Artist>> e) throws Exception { e.onNext(ArtistsPresenter.getAllArtists()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getQueueSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(PlayQueuePresenter.getQueueSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getAlbumSongs(final long albumId) { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(AlbumDetailPresenter.getAlbumSongs(albumId)); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getRecentlyAddedSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(RecentlyAddedPresenter.getRecentlyAddedSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }