/** * 插入到系统相册, 并刷新系统相册 * @param imageUrl */ private static void insertSystemAlbumAndRefresh(final String imageUrl) { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { File file = FileUtils.createFileFrom(imageUrl); String imageUri = MediaStore.Images.Media.insertImage(ApplicationProvider.IMPL.getApp().getContentResolver(), file.getAbsolutePath(), file.getName(), "图片: " + file.getName()); Log.d("_stone_", "insertSystemAlbumAndRefresh-subscribe: imageUri=" + imageUri); //讲数据插入到系统图库, 在系统相册APP中就可以看到保存的图片了. //为了保险起见, 再同步一下系统图库 syncAlbum(imageUrl); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); }
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { Looper.prepare(); try { joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } Looper.loop(); } } , BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); }
@Override public Flowable doPullStreamPost(final IRequestEntity requestEntity, final File file) { Flowable flowable = DefaultFlowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); Gson gson = new Gson(); String json = gson.toJson(requestEntity); RequestBody body = RequestBody.create(JSON, json); Request request = new Request.Builder() .url(mUrl) .post(body) .build(); Response response = mClient.newCall(request).execute(); if (response == null) { emitter.onError(new EmptyException()); return; } recvFile(emitter, response, file); emitter.onComplete(); } }); return flowable; }
@Test public void createYourOwnTibco() throws Exception { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { while (!e.isCancelled()) { long numberRecords = e.requested(); System.out.println(numberRecords); if (numberRecords > 0) { } } } }, BackpressureStrategy.BUFFER); flowable.map(x -> x + "Yay!").subscribe(System.out::println); }
/** * @param path the root directory containing files to be emitted * @param depth offset, where 1 == data directory, 2 == zoom directory, 3 == row, 4 == column * @return a stream of file references to data structured as the Google tiling scheme. */ public static Observable<File> getTiles(String path, int depth) { // programmer's note: // https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831 // http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/ // Essentially lots of way to skin this cat - e.g. onBackpressureBlock / // reactive pull return Flowable.create(new FlowableOnSubscribe<File>() { @Override public void subscribe(FlowableEmitter<File> subscriber) throws Exception { try { // warning: depth 4 is not really walking (semantics) walk(path, depth, subscriber); if (!subscriber.isCancelled()) { subscriber.onComplete(); } } catch (final Exception ex) { if (!subscriber.isCancelled()) { subscriber.onError(ex); } } } }, BackpressureStrategy.BUFFER).toObservable(); }
public static Observable<File> getTiles(String path) { // programmer's note: // https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831 // http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/ // Essentially lots of way to skin this cat - e.g. onBackpressureBlock / // reactive pull return Flowable.create(new FlowableOnSubscribe<File>() { @Override public void subscribe(FlowableEmitter<File> subscriber) throws Exception { try { walk(path, 1, subscriber); if (!subscriber.isCancelled()) { subscriber.onComplete(); } } catch (final Exception ex) { if (!subscriber.isCancelled()) { subscriber.onError(ex); } } } }, BackpressureStrategy.BUFFER).toObservable(); }
@Override public Flowable<T> flowable(BackpressureStrategy mode) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { final CloseableIterator<T> iterator = mCallable.call(); try { while (!emitter.isCancelled() && iterator.hasNext()) { emitter.onNext(iterator.next()); } emitter.onComplete(); } finally { iterator.close(); } } }, mode); }
@Override public final Flowable<RxTaskMessage<T, U>> createFlowable() { return Flowable.create(new FlowableOnSubscribe<RxTaskMessage<T, U>>() { @Override public void subscribe(final FlowableEmitter<RxTaskMessage<T, U>> e) throws Exception { TaskLogger.v(TAG, "subscribe start"); T innerResult = run(new TaskAgent<U>() { @Override public void publishProgress(U progressObject) { TaskLogger.v(TAG, "onProgress"); e.onNext(RxTaskMessage.<T, U>createProgress(progressObject)); } }); e.onNext(RxTaskMessage.<T, U>createResult(innerResult)); e.onComplete(); TaskLogger.v(TAG, "subscribe end"); } }, BackpressureStrategy.ERROR); }
/** * 查询全国所有的省,从数据库查询 */ private void queryProvinces() { getToolbar().setTitle("选择省份"); Flowable.create((FlowableOnSubscribe<String>) emitter -> { if (provincesList.isEmpty()) { provincesList.addAll(WeatherDB.loadProvinces(DBManager.getInstance().getDatabase())); } dataList.clear(); for (Province province : provincesList) { emitter.onNext(province.mProName); } emitter.onComplete(); }, BackpressureStrategy.BUFFER) .compose(RxUtil.ioF()) .compose(RxUtil.activityLifecycleF(this)) .doOnNext(proName -> dataList.add(proName)) .doOnComplete(() -> { mProgressBar.setVisibility(View.GONE); currentLevel = LEVEL_PROVINCE; mAdapter.notifyDataSetChanged(); }) .subscribe(); }
/** * 查询选中省份的所有城市,从数据库查询 */ private void queryCities() { getToolbar().setTitle("选择城市"); dataList.clear(); mAdapter.notifyDataSetChanged(); Flowable.create((FlowableOnSubscribe<String>) emitter -> { cityList = WeatherDB.loadCities(DBManager.getInstance().getDatabase(), selectedProvince.mProSort); for (City city : cityList) { emitter.onNext(city.mCityName); } emitter.onComplete(); }, BackpressureStrategy.BUFFER) .compose(RxUtil.ioF()) .compose(RxUtil.activityLifecycleF(this)) .doOnNext(proName -> dataList.add(proName)) .doOnComplete(() -> { currentLevel = LEVEL_CITY; mAdapter.notifyDataSetChanged(); mRecyclerView.smoothScrollToPosition(0); }) .subscribe(); }
private Flowable<GoogleSignInResult> googleObservable(Activity activity) { return Flowable.create((FlowableOnSubscribe<ConnectionResult>) e -> { e.onNext(mGoogleApiClient.blockingConnect(GOOGLE_API_CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS)); e.onComplete(); }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.io()) .flatMap(connectionResult -> { if (!connectionResult.isSuccess()) { return Flowable.error(new LoginException(LoginException.GOOGLE_ERROR, connectionResult.getErrorCode(), connectionResult.getErrorMessage())); } return Flowable.create(new GoogleSubscriber(this), BackpressureStrategy.DROP); }) .doOnSubscribe(subscription -> activity.startActivityForResult(getGoogleSingInIntent(), RC_SIGN_IN)) .doOnTerminate(() -> { mGoogleApiClient.disconnect(); mGoogleApiClient = null; mGoogleCallback = null; }); }
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) { return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() { @Override public void subscribe(FlowableEmitter<RealmResults<T>> emitter) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmQuery<T> query = realm.where(clazz); if (map != null) { for (Map.Entry<String, String> entry : map.entrySet()) { query.equalTo(entry.getKey(), entry.getValue()); } } RealmResults<T> results = query.findAll(); final RealmChangeListener<RealmResults<T>> listener = _realm -> { if (!emitter.isCancelled()) { emitter.onNext(results); } }; emitter.setDisposable(Disposables.fromRunnable(() -> { results.removeChangeListener(listener); realm.close(); })); results.addChangeListener(listener); emitter.onNext(results); } }, BackpressureStrategy.LATEST); }
/** * 生成Flowable * @param <T> * @return */ public static <T> Flowable<T> createData(final T t) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { try { emitter.onNext(t); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER); }
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) { return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() { @Override public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception { record.save(e, response); } }, BackpressureStrategy.LATEST); }
/** * 保存断点下载的文件,以及下载进度 * * @param index 下载编号 * @param response 响应值 * @return Flowable */ private Publisher<DownloadStatus> save(final int index, final ResponseBody response) { Flowable<DownloadStatus> flowable = Flowable.create(new FlowableOnSubscribe<DownloadStatus>() { @Override public void subscribe(FlowableEmitter<DownloadStatus> emitter) throws Exception { record.save(emitter, index, response); } }, BackpressureStrategy.LATEST) .replay(1) .autoConnect(); return flowable.throttleFirst(100, TimeUnit.MILLISECONDS).mergeWith(flowable.takeLast(1)) .subscribeOn(Schedulers.newThread()); }
@Override public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
@Override public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<E>() { @Override public void subscribe(final FlowableEmitter<E> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmChangeListener<E> listener = new RealmChangeListener<E>() { @Override public void onChange(E obj) { if (!emitter.isCancelled()) { emitter.onNext(obj); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(object); } }, BACK_PRESSURE_STRATEGY); }
@Override public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() { @Override public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig); objectRefs.get().acquireReference(object); final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() { @Override public void onChange(DynamicRealmObject obj) { if (!emitter.isCancelled()) { emitter.onNext(obj); } } }; RealmObject.addChangeListener(object, listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { RealmObject.removeChangeListener(object, listener); observableRealm.close(); objectRefs.get().releaseReference(object); } })); // Emit current value immediately emitter.onNext(object); } }, BACK_PRESSURE_STRATEGY); }
/** * The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called. * Uses given BackpressureStrategy. */ public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(final FlowableEmitter<T> emitter) throws Exception { createListItemEmitter(query, emitter); } }, strategy); }
public void calcIconTotal() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception { XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable); int total = 0; while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) { if (xml.getEventType() == XmlPullParser.START_TAG) { if (xml.getName().startsWith("item")) { total++; } } xml.next(); } flowableEmitter.onNext(total); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { mView.setIconTotal(integer); } }); }
private void doRxJavaWork() { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("事件1"); e.onNext("事件2"); e.onNext("事件3"); e.onNext("事件4"); e.onComplete(); } }, BackpressureStrategy.BUFFER); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe: "); s.request(Long.MAX_VALUE); } @Override public void onNext(String string) { Log.d(TAG, "onNext: " + string); } @Override public void onError(Throwable t) { Log.d(TAG, "onError: " + t.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }; flowable.subscribe(subscriber); }
/** * 获取内存缓存请求Flowable对象 * * @return 内存缓存查询Flowable对象 */ private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() { //内存缓存数据获取 return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() { @Override public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception { T response = null; try { if (ccCacheQueryCallback != null) { response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey); } CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false); e.onNext(tccBaseResponse); e.onComplete(); } catch (Exception exception) { switch (cacheQueryMode) { case CCCacheMode.QueryMode.MODE_ONLY_MEMORY: e.onError(new CCDiskCacheQueryException(exception)); break; default: e.onComplete(); break; } } } }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()); }
/** * 获取磁盘缓存请求Flowable对象 * * @return 磁盘缓存查询Flowable对象 */ private Flowable<CCBaseResponse<T>> getDiskCacheQueryFlowable() { //磁盘缓存获取,包括任何形式的磁盘缓存 return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() { @Override public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception { T response = null; try { if (ccCacheQueryCallback != null) { response = ccCacheQueryCallback.<T>onQueryFromDisk(cacheKey); } CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, false, true); e.onNext(tccBaseResponse); e.onComplete(); } catch (Exception exception) { switch (cacheQueryMode) { case CCCacheMode.QueryMode.MODE_ONLY_DISK: case CCCacheMode.QueryMode.MODE_MEMORY_THEN_DISK: e.onError(new CCDiskCacheQueryException(exception)); break; default: e.onComplete(); break; } } } }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io()); }
/** * 加载数据,刷新视图 **/ private void loadData() { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { reset(); /* io线程加载数据库数据 */ List<AlarmClock> alarmClocks = mAssistDao.findAllAlarmAsc(false); week = Calendar.getInstance().get(Calendar.DAY_OF_WEEK); week = week - 1 == 0 ? 7 : week - 1; //计算当天是星期几 week = week + 1 == 8 ? 1 : week + 1; // 计算明天是星期几 for (AlarmClock alarm : alarmClocks) { TaskCard<AlarmClock> card = new TaskCard<>(alarm, TaskCard.TaskState.ACTIVE); if (alarm.getValid() == 0) card.taskState = TaskCard.TaskState.INVALID; updateAlarmCount(AccountingActivity.TYPE_ADD, alarm); alarmDatas.add(card); } e.onNext(0); } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程 .doOnSubscribe(new Consumer<Subscription>() { @Override public void accept(Subscription subscription) throws Exception { mCpbLoad.setVisibility(View.VISIBLE); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程 .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { /* 回到主线程刷新列表 */ mCpbLoad.setVisibility(View.GONE); mAdapter.notifyDataSetChanged(); } }); }
private void flowableTest() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 128; i++) { Log.d(TAG, "emit " + i); emitter.onNext(i); } } }, BackpressureStrategy.ERROR)//增加了一个参数 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); // s.request(Long.MAX_VALUE); //注意这句代码 mSubscription = s; } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
/** * 生成Flowable * * @param t * @return Flowable */ public static <T> Flowable<T> createFlowable(final T t) { return Flowable.create(new FlowableOnSubscribe<T>() { @Override public void subscribe(FlowableEmitter<T> emitter) throws Exception { try { emitter.onNext(t); emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }, BackpressureStrategy.BUFFER); }
@Override public Flowable<List<Song>> getAllSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(SongsPresenter.getAllSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Playlist>> getAllPlaylists() { return Flowable.create(new FlowableOnSubscribe<List<Playlist>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Playlist>> e) throws Exception { e.onNext(PlaylistsPresenter.getAllPlaylists()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Album>> getAllAlbums() { return Flowable.create(new FlowableOnSubscribe<List<Album>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception { e.onNext(AlbumsPresenter.getAllAlbums()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Album>> getArtistAlbums(final long artistId) { return Flowable.create(new FlowableOnSubscribe<List<Album>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception { e.onNext(AlbumsPresenter.getArtistAlbums(artistId)); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Artist>> getAllArtists() { return Flowable.create(new FlowableOnSubscribe<List<Artist>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Artist>> e) throws Exception { e.onNext(ArtistsPresenter.getAllArtists()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getQueueSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(PlayQueuePresenter.getQueueSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getAlbumSongs(final long albumId) { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(AlbumDetailPresenter.getAlbumSongs(albumId)); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getRecentlyAddedSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(RecentlyAddedPresenter.getRecentlyAddedSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getRecentlyPlayedSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(RecentlyPlayPresenter.getRecentlyPlaySongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override public Flowable<List<Song>> getMyFavoriteSongs() { return Flowable.create(new FlowableOnSubscribe<List<Song>>() { @Override public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { e.onNext(MyFavoritePresenter.getFavoriteSongs()); e.onComplete(); } }, BackpressureStrategy.LATEST); }
@Override protected <T> Observable<T> execute(Type type) { return (Observable<T>) apiService .downFile(suffixUrl, params) .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .toFlowable(BackpressureStrategy.LATEST) .flatMap(new Function<ResponseBody, Publisher<?>>() { @Override public Publisher<?> apply(final ResponseBody responseBody) throws Exception { return Flowable.create(new FlowableOnSubscribe<DownProgress>() { @Override public void subscribe(FlowableEmitter<DownProgress> subscriber) throws Exception { File dir = getDiskCacheDir(rootName, dirName); if (!dir.exists()) { dir.mkdirs(); } File file = new File(dir.getPath() + File.separator + fileName); saveFile(subscriber, file, responseBody); } }, BackpressureStrategy.LATEST); } }) .sample(1, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .toObservable() .retryWhen(new ApiRetryFunc(retryCount, retryDelayMillis)); }
/** * Listener for changes in te data at the given query location. * * @param query reference represents a particular location in your Database and can be used for reading or writing data to that Database location. * @param strategy {@link BackpressureStrategy} associated to this {@link Flowable} * @return a {@link Flowable} which emits when a value of the database change in the given query. */ @NonNull public static Flowable<DataSnapshot> observeValueEvent(@NonNull final Query query, @NonNull BackpressureStrategy strategy) { return Flowable.create(new FlowableOnSubscribe<DataSnapshot>() { @Override public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Exception { final ValueEventListener valueEventListener = new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { emitter.onNext(dataSnapshot); } @Override public void onCancelled(final DatabaseError error) { emitter.onError(new RxFirebaseDataException(error)); } }; emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { query.removeEventListener(valueEventListener); } }); query.addValueEventListener(valueEventListener); } }, strategy); }