Java 类io.reactivex.FlowableOnSubscribe 实例源码
项目:SaveImage2SystemAlbum
文件:AlbumManager.java
/**
* 插入到系统相册, 并刷新系统相册
* @param imageUrl
*/
private static void insertSystemAlbumAndRefresh(final String imageUrl) {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
File file = FileUtils.createFileFrom(imageUrl);
String imageUri = MediaStore.Images.Media.insertImage(ApplicationProvider.IMPL.getApp().getContentResolver(), file.getAbsolutePath(), file.getName(), "图片: " + file.getName());
Log.d("_stone_", "insertSystemAlbumAndRefresh-subscribe: imageUri=" + imageUri);
//讲数据插入到系统图库, 在系统相册APP中就可以看到保存的图片了.
//为了保险起见, 再同步一下系统图库
syncAlbum(imageUrl);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
项目:SAF-AOP
文件:AsyncAspect.java
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
Looper.prepare();
try {
joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
Looper.loop();
}
}
, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
项目:AutoNet
文件:AutoNetRepoImpl.java
@Override
public Flowable doPullStreamPost(final IRequestEntity requestEntity, final File file) {
Flowable flowable = DefaultFlowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
Gson gson = new Gson();
String json = gson.toJson(requestEntity);
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(mUrl)
.post(body)
.build();
Response response = mClient.newCall(request).execute();
if (response == null) {
emitter.onError(new EmptyException());
return;
}
recvFile(emitter, response, file);
emitter.onComplete();
}
});
return flowable;
}
项目:rxjavatraining
文件:TibcoObservableTest.java
@Test
public void createYourOwnTibco() throws Exception {
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
while (!e.isCancelled()) {
long numberRecords = e.requested();
System.out.println(numberRecords);
if (numberRecords > 0) {
}
}
}
}, BackpressureStrategy.BUFFER);
flowable.map(x -> x + "Yay!").subscribe(System.out::println);
}
项目:vt-support
文件:FilesystemUtil.java
/**
* @param path the root directory containing files to be emitted
* @param depth offset, where 1 == data directory, 2 == zoom directory, 3 == row, 4 == column
* @return a stream of file references to data structured as the Google tiling scheme.
*/
public static Observable<File> getTiles(String path, int depth) {
// programmer's note:
// https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831
// http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/
// Essentially lots of way to skin this cat - e.g. onBackpressureBlock /
// reactive pull
return Flowable.create(new FlowableOnSubscribe<File>() {
@Override
public void subscribe(FlowableEmitter<File> subscriber) throws Exception {
try {
// warning: depth 4 is not really walking (semantics)
walk(path, depth, subscriber);
if (!subscriber.isCancelled()) {
subscriber.onComplete();
}
} catch (final Exception ex) {
if (!subscriber.isCancelled()) {
subscriber.onError(ex);
}
}
}
}, BackpressureStrategy.BUFFER).toObservable();
}
项目:vt-support
文件:FilesystemUtil.java
public static Observable<File> getTiles(String path) {
// programmer's note:
// https://medium.com/we-are-yammer/converting-callback-async-calls-to-rxjava-ebc68bde5831
// http://vlkan.com/blog/post/2016/07/20/rxjava-backpressure/
// Essentially lots of way to skin this cat - e.g. onBackpressureBlock /
// reactive pull
return Flowable.create(new FlowableOnSubscribe<File>() {
@Override
public void subscribe(FlowableEmitter<File> subscriber) throws Exception {
try {
walk(path, 1, subscriber);
if (!subscriber.isCancelled()) {
subscriber.onComplete();
}
} catch (final Exception ex) {
if (!subscriber.isCancelled()) {
subscriber.onError(ex);
}
}
}
}, BackpressureStrategy.BUFFER).toObservable();
}
项目:alchemy
文件:RxJava2Fetchable.java
@Override
public Flowable<T> flowable(BackpressureStrategy mode) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
final CloseableIterator<T> iterator = mCallable.call();
try {
while (!emitter.isCancelled() && iterator.hasNext()) {
emitter.onNext(iterator.next());
}
emitter.onComplete();
} finally {
iterator.close();
}
}
}, mode);
}
项目:AxoloTL
文件:BaseRunTask.java
@Override
public final Flowable<RxTaskMessage<T, U>> createFlowable() {
return Flowable.create(new FlowableOnSubscribe<RxTaskMessage<T, U>>() {
@Override
public void subscribe(final FlowableEmitter<RxTaskMessage<T, U>> e) throws Exception {
TaskLogger.v(TAG, "subscribe start");
T innerResult = run(new TaskAgent<U>() {
@Override
public void publishProgress(U progressObject) {
TaskLogger.v(TAG, "onProgress");
e.onNext(RxTaskMessage.<T, U>createProgress(progressObject));
}
});
e.onNext(RxTaskMessage.<T, U>createResult(innerResult));
e.onComplete();
TaskLogger.v(TAG, "subscribe end");
}
}, BackpressureStrategy.ERROR);
}
项目:SeeWeather
文件:ChoiceCityActivity.java
/**
* 查询全国所有的省,从数据库查询
*/
private void queryProvinces() {
getToolbar().setTitle("选择省份");
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
if (provincesList.isEmpty()) {
provincesList.addAll(WeatherDB.loadProvinces(DBManager.getInstance().getDatabase()));
}
dataList.clear();
for (Province province : provincesList) {
emitter.onNext(province.mProName);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.compose(RxUtil.ioF())
.compose(RxUtil.activityLifecycleF(this))
.doOnNext(proName -> dataList.add(proName))
.doOnComplete(() -> {
mProgressBar.setVisibility(View.GONE);
currentLevel = LEVEL_PROVINCE;
mAdapter.notifyDataSetChanged();
})
.subscribe();
}
项目:SeeWeather
文件:ChoiceCityActivity.java
/**
* 查询选中省份的所有城市,从数据库查询
*/
private void queryCities() {
getToolbar().setTitle("选择城市");
dataList.clear();
mAdapter.notifyDataSetChanged();
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
cityList = WeatherDB.loadCities(DBManager.getInstance().getDatabase(), selectedProvince.mProSort);
for (City city : cityList) {
emitter.onNext(city.mCityName);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.compose(RxUtil.ioF())
.compose(RxUtil.activityLifecycleF(this))
.doOnNext(proName -> dataList.add(proName))
.doOnComplete(() -> {
currentLevel = LEVEL_CITY;
mAdapter.notifyDataSetChanged();
mRecyclerView.smoothScrollToPosition(0);
})
.subscribe();
}
项目:RxLogin
文件:RxLogin.java
private Flowable<GoogleSignInResult> googleObservable(Activity activity) {
return Flowable.create((FlowableOnSubscribe<ConnectionResult>) e -> {
e.onNext(mGoogleApiClient.blockingConnect(GOOGLE_API_CONNECTION_TIMEOUT_SECONDS,
TimeUnit.SECONDS));
e.onComplete();
},
BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.flatMap(connectionResult -> {
if (!connectionResult.isSuccess()) {
return Flowable.error(new LoginException(LoginException.GOOGLE_ERROR,
connectionResult.getErrorCode(), connectionResult.getErrorMessage()));
}
return Flowable.create(new GoogleSubscriber(this), BackpressureStrategy.DROP);
})
.doOnSubscribe(subscription -> activity.startActivityForResult(getGoogleSingInIntent(),
RC_SIGN_IN))
.doOnTerminate(() -> {
mGoogleApiClient.disconnect();
mGoogleApiClient = null;
mGoogleCallback = null;
});
}
项目:GitHub
文件:RealmHelper.java
public static <T extends RealmModel> Flowable<RealmResults<T>> getRealmItems(Class clazz, HashMap<String, String> map) {
return Flowable.create(new FlowableOnSubscribe<RealmResults<T>>() {
@Override
public void subscribe(FlowableEmitter<RealmResults<T>> emitter)
throws Exception {
Realm realm = Realm.getDefaultInstance();
RealmQuery<T> query = realm.where(clazz);
if (map != null) {
for (Map.Entry<String, String> entry : map.entrySet()) {
query.equalTo(entry.getKey(), entry.getValue());
}
}
RealmResults<T> results = query.findAll();
final RealmChangeListener<RealmResults<T>> listener = _realm -> {
if (!emitter.isCancelled()) {
emitter.onNext(results);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
results.removeChangeListener(listener);
realm.close();
}));
results.addChangeListener(listener);
emitter.onNext(results);
}
}, BackpressureStrategy.LATEST);
}
项目:GitHub
文件:RxUtil.java
/**
* 生成Flowable
* @param <T>
* @return
*/
public static <T> Flowable<T> createData(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
项目:GitHub
文件:DownloadType.java
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) {
return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
@Override
public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception {
record.save(e, response);
}
}, BackpressureStrategy.LATEST);
}
项目:GitHub
文件:DownloadType.java
/**
* 保存断点下载的文件,以及下载进度
*
* @param index 下载编号
* @param response 响应值
* @return Flowable
*/
private Publisher<DownloadStatus> save(final int index, final ResponseBody response) {
Flowable<DownloadStatus> flowable = Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
@Override
public void subscribe(FlowableEmitter<DownloadStatus> emitter) throws Exception {
record.save(emitter, index, response);
}
}, BackpressureStrategy.LATEST)
.replay(1)
.autoConnect();
return flowable.throttleFirst(100, TimeUnit.MILLISECONDS).mergeWith(flowable.takeLast(1))
.subscribeOn(Schedulers.newThread());
}
项目:GitHub
文件:RxUtil.java
/**
* 生成Flowable
* @param <T>
* @return
*/
public static <T> Flowable<T> createData(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
@Override
public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results) {
if (!emitter.isCancelled()) {
emitter.onNext(list);
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(list);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Flowable<RealmList<E>> from(DynamicRealm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() {
@Override
public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results) {
if (!emitter.isCancelled()) {
emitter.onNext(list);
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(list);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E extends RealmModel> Flowable<E> from(final Realm realm, final E object) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<E>() {
@Override
public void subscribe(final FlowableEmitter<E> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
objectRefs.get().acquireReference(object);
final RealmChangeListener<E> listener = new RealmChangeListener<E>() {
@Override
public void onChange(E obj) {
if (!emitter.isCancelled()) {
emitter.onNext(obj);
}
}
};
RealmObject.addChangeListener(object, listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
RealmObject.removeChangeListener(object, listener);
observableRealm.close();
objectRefs.get().releaseReference(object);
}
}));
// Emit current value immediately
emitter.onNext(object);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public Flowable<DynamicRealmObject> from(DynamicRealm realm, final DynamicRealmObject object) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<DynamicRealmObject>() {
@Override
public void subscribe(final FlowableEmitter<DynamicRealmObject> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
objectRefs.get().acquireReference(object);
final RealmChangeListener<DynamicRealmObject> listener = new RealmChangeListener<DynamicRealmObject>() {
@Override
public void onChange(DynamicRealmObject obj) {
if (!emitter.isCancelled()) {
emitter.onNext(obj);
}
}
};
RealmObject.addChangeListener(object, listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
RealmObject.removeChangeListener(object, listener);
observableRealm.close();
objectRefs.get().releaseReference(object);
}
}));
// Emit current value immediately
emitter.onNext(object);
}
}, BACK_PRESSURE_STRATEGY);
}
项目:ObjectBoxRxJava
文件:RxQuery.java
/**
* The returned Flowable emits Query results one by one. Once all results have been processed, onComplete is called.
* Uses given BackpressureStrategy.
*/
public static <T> Flowable<T> flowableOneByOne(final Query<T> query, BackpressureStrategy strategy) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(final FlowableEmitter<T> emitter) throws Exception {
createListItemEmitter(query, emitter);
}
}, strategy);
}
项目:MBEStyle
文件:IconPresenter.java
public void calcIconTotal() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
int total = 0;
while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
if (xml.getEventType() == XmlPullParser.START_TAG) {
if (xml.getName().startsWith("item")) {
total++;
}
}
xml.next();
}
flowableEmitter.onNext(total);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mView.setIconTotal(integer);
}
});
}
项目:GetStartRxJava2.0
文件:BaseFlowableActivity.java
private void doRxJavaWork() {
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onNext("事件4");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe: ");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext: " + string);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: " + t.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
flowable.subscribe(subscriber);
}
项目:EvolvingNetLib
文件:CCRequest.java
/**
* 获取内存缓存请求Flowable对象
*
* @return 内存缓存查询Flowable对象
*/
private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() {
//内存缓存数据获取
return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {
T response = null;
try {
if (ccCacheQueryCallback != null) {
response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey);
}
CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false);
e.onNext(tccBaseResponse);
e.onComplete();
} catch (Exception exception) {
switch (cacheQueryMode) {
case CCCacheMode.QueryMode.MODE_ONLY_MEMORY:
e.onError(new CCDiskCacheQueryException(exception));
break;
default:
e.onComplete();
break;
}
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
}
项目:EvolvingNetLib
文件:CCRequest.java
/**
* 获取磁盘缓存请求Flowable对象
*
* @return 磁盘缓存查询Flowable对象
*/
private Flowable<CCBaseResponse<T>> getDiskCacheQueryFlowable() {
//磁盘缓存获取,包括任何形式的磁盘缓存
return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {
T response = null;
try {
if (ccCacheQueryCallback != null) {
response = ccCacheQueryCallback.<T>onQueryFromDisk(cacheKey);
}
CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, false, true);
e.onNext(tccBaseResponse);
e.onComplete();
} catch (Exception exception) {
switch (cacheQueryMode) {
case CCCacheMode.QueryMode.MODE_ONLY_DISK:
case CCCacheMode.QueryMode.MODE_MEMORY_THEN_DISK:
e.onError(new CCDiskCacheQueryException(exception));
break;
default:
e.onComplete();
break;
}
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
}
项目:AssistantBySDK
文件:AlarmActivity.java
/**
* 加载数据,刷新视图
**/
private void loadData() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
reset();
/* io线程加载数据库数据 */
List<AlarmClock> alarmClocks = mAssistDao.findAllAlarmAsc(false);
week = Calendar.getInstance().get(Calendar.DAY_OF_WEEK);
week = week - 1 == 0 ? 7 : week - 1; //计算当天是星期几
week = week + 1 == 8 ? 1 : week + 1; // 计算明天是星期几
for (AlarmClock alarm : alarmClocks) {
TaskCard<AlarmClock> card = new TaskCard<>(alarm, TaskCard.TaskState.ACTIVE);
if (alarm.getValid() == 0)
card.taskState = TaskCard.TaskState.INVALID;
updateAlarmCount(AccountingActivity.TYPE_ADD, alarm);
alarmDatas.add(card);
}
e.onNext(0);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
mCpbLoad.setVisibility(View.VISIBLE);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
/* 回到主线程刷新列表 */
mCpbLoad.setVisibility(View.GONE);
mAdapter.notifyDataSetChanged();
}
});
}
项目:RX_Demo
文件:Rx2Test2Activity.java
private void flowableTest() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)//增加了一个参数
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// s.request(Long.MAX_VALUE); //注意这句代码
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
项目:YiZhi
文件:RxHelper.java
/**
* 生成Flowable
*
* @param t
* @return Flowable
*/
public static <T> Flowable<T> createFlowable(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Song>> getAllSongs() {
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(SongsPresenter.getAllSongs());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Playlist>> getAllPlaylists() {
return Flowable.create(new FlowableOnSubscribe<List<Playlist>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Playlist>> e) throws Exception {
e.onNext(PlaylistsPresenter.getAllPlaylists());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Album>> getAllAlbums() {
return Flowable.create(new FlowableOnSubscribe<List<Album>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception {
e.onNext(AlbumsPresenter.getAllAlbums());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Album>> getArtistAlbums(final long artistId) {
return Flowable.create(new FlowableOnSubscribe<List<Album>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Album>> e) throws Exception {
e.onNext(AlbumsPresenter.getArtistAlbums(artistId));
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Artist>> getAllArtists() {
return Flowable.create(new FlowableOnSubscribe<List<Artist>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Artist>> e) throws Exception {
e.onNext(ArtistsPresenter.getAllArtists());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Song>> getQueueSongs() {
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(PlayQueuePresenter.getQueueSongs());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Song>> getAlbumSongs(final long albumId) {
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(AlbumDetailPresenter.getAlbumSongs(albumId));
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Song>> getRecentlyAddedSongs() {
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(RecentlyAddedPresenter.getRecentlyAddedSongs());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Song>> getRecentlyPlayedSongs() {
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(RecentlyPlayPresenter.getRecentlyPlaySongs());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:KomaMusic
文件:LocalDataSource.java
@Override
public Flowable<List<Song>> getMyFavoriteSongs() {
return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
e.onNext(MyFavoritePresenter.getFavoriteSongs());
e.onComplete();
}
}, BackpressureStrategy.LATEST);
}
项目:XSnow
文件:DownloadRequest.java
@Override
protected <T> Observable<T> execute(Type type) {
return (Observable<T>) apiService
.downFile(suffixUrl, params)
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.toFlowable(BackpressureStrategy.LATEST)
.flatMap(new Function<ResponseBody, Publisher<?>>() {
@Override
public Publisher<?> apply(final ResponseBody responseBody) throws Exception {
return Flowable.create(new FlowableOnSubscribe<DownProgress>() {
@Override
public void subscribe(FlowableEmitter<DownProgress> subscriber) throws Exception {
File dir = getDiskCacheDir(rootName, dirName);
if (!dir.exists()) {
dir.mkdirs();
}
File file = new File(dir.getPath() + File.separator + fileName);
saveFile(subscriber, file, responseBody);
}
}, BackpressureStrategy.LATEST);
}
})
.sample(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.toObservable()
.retryWhen(new ApiRetryFunc(retryCount, retryDelayMillis));
}
项目:showcase-android
文件:RxFirebaseDatabase.java
/**
* Listener for changes in te data at the given query location.
*
* @param query reference represents a particular location in your Database and can be used for reading or writing data to that Database location.
* @param strategy {@link BackpressureStrategy} associated to this {@link Flowable}
* @return a {@link Flowable} which emits when a value of the database change in the given query.
*/
@NonNull
public static Flowable<DataSnapshot> observeValueEvent(@NonNull final Query query,
@NonNull BackpressureStrategy strategy) {
return Flowable.create(new FlowableOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(final FlowableEmitter<DataSnapshot> emitter) throws Exception {
final ValueEventListener valueEventListener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
emitter.onNext(dataSnapshot);
}
@Override
public void onCancelled(final DatabaseError error) {
emitter.onError(new RxFirebaseDataException(error));
}
};
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(valueEventListener);
}
});
query.addValueEventListener(valueEventListener);
}
}, strategy);
}