Java 类io.reactivex.ObservableEmitter 实例源码
项目:Rx_java2_soussidev
文件:LollipopNetworkObservingStrategy.java
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
final String service = Context.CONNECTIVITY_SERVICE;
final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
return Observable.create(new ObservableOnSubscribe<Connectivity>() {
@Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
networkCallback = createNetworkCallback(subscriber, context);
final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
manager.registerNetworkCallback(networkRequest, networkCallback);
}
}).doOnDispose(new Action() {
@Override public void run() {
tryToUnregisterCallback(manager);
}
}).startWith(Connectivity.create(context)).distinctUntilChanged();
}
项目:GitHub
文件:DebounceExampleActivity.java
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
项目:GitHub
文件:ThrottleLastExampleActivity.java
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
项目:ReactiveAirplaneMode
文件:ReactiveAirplaneMode.java
/**
* Observes Airplane Mode state of the device with BroadcastReceiver.
* RxJava2 Observable emits true if the airplane mode turns on and false otherwise.
*
* @param context of the Application or Activity
* @return RxJava2 Observable with Boolean value indicating state of the airplane mode
*/
public Observable<Boolean> observe(final Context context) {
checkContextIsNotNull(context);
final IntentFilter filter = createIntentFilter();
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter)
throws Exception {
final BroadcastReceiver receiver = createBroadcastReceiver(emitter);
context.registerReceiver(receiver, filter);
final Disposable disposable = disposeInUiThread(new Action() {
@Override public void run() throws Exception {
tryToUnregisterReceiver(receiver, context);
}
});
emitter.setDisposable(disposable);
}
});
}
项目:dagger-test-example
文件:ImageRequestManager.java
private SimpleTarget<Bitmap> getInto(String iconUrl, final ObservableEmitter<Bitmap> emitter) {
return requestManager.load(Uri.parse(iconUrl)).asBitmap().into(new SimpleTarget<Bitmap>()
{
@Override
public void onResourceReady(Bitmap resource, GlideAnimation<? super Bitmap> glideAnimation)
{
emitter.onNext(resource);
emitter.onComplete();
}
@Override
public void onLoadFailed(Exception e, Drawable errorDrawable) {
super.onLoadFailed(e, errorDrawable);
emitter.onError(e);
}
});
}
项目:RxEasyHttp
文件:MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
FileUtils.getFileFromAsset(MainActivity.this, "1.jpg");
}
}).compose(RxUtil.<String>io_main()).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
}
});
}
项目:YiZhi
文件:HeadSettingActivity.java
@OnClick({R.id.tv_cancel, R.id.tv_ok})
public void onClick(View view) {
switch (view.getId()) {
case R.id.tv_cancel:
onBackPressedSupport();
break;
case R.id.tv_ok:
Observable.create(new ObservableOnSubscribe<Uri>() {
@Override
public void subscribe(ObservableEmitter<Uri> e) throws
Exception {
e.onNext(generateUri());
e.onComplete();
}
}).compose(RxHelper.<Uri>rxSchedulerHelper())
.subscribe(new Consumer<Uri>() {
@Override
public void accept(Uri uri) throws Exception {
RxEventHeadBean rxEventHeadBean = new RxEventHeadBean(uri);
RxBus.get().send(RX_BUS_CODE_HEAD_IMAGE_URI, rxEventHeadBean);
onBackPressedSupport();
}
});
break;
}
}
项目:Assembler
文件:_Router.java
@Override
public boolean saveData(final Object obj) {
if (null == obj) {
return false;
}
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(new Gson().toJson(obj));
e.onComplete();
}
});
RouterWare.observerables.put(routerRequest.getUri().toString(), observable);
return true;
}
项目:ObjectBoxRxJava
文件:RxBoxStore.java
/**
* Using the returned Observable, you can be notified about data changes.
* Once a transaction is committed, you will get info on classes with changed Objects.
*/
public static <T> Observable<Class> observable(final BoxStore boxStore) {
return Observable.create(new ObservableOnSubscribe<Class>() {
@Override
public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
@Override
public void onData(Class data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
项目:Rxjava2.0Demo
文件:MapActivity.java
private void map() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName());
info += s + "\n";
tv.setText(info);
}
});
}
项目:BakingApp
文件:RecipeDatabaseHelper.java
public void getAllRecipes(Observer<ArrayList<Recipe>> observer){
Observable.create(new ObservableOnSubscribe<ArrayList<Recipe>>() {
@Override
public void subscribe(ObservableEmitter<ArrayList<Recipe>> e) throws Exception {
Cursor cursor = mContext.getContentResolver()
.query(RecipesContract.RecipeEntry.CONTENT_URI,
null,
null,
null,
RecipesContract.RecipeEntry.COLUMN_NAME + " DESC");
ArrayList<Recipe> recipes;
if(cursor != null){
recipes = buildRecipesFromCursor(cursor);
cursor.close();
e.onNext(recipes);
} else {
e.onError(new NullPointerException("Recipe cursor is null."));
}
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
项目:NeteaseCloudMusic
文件:ConfigPresenter.java
public void requestLoadingList() {
Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() {
@Override
public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception {
mModel = ConfigModel.getInstance(configView.getContext());
e.onNext(mModel.getConfigList());
mModel.setConfigCallback(ConfigPresenter.this);
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ConfigBean>>() {
@Override
public void accept(List<ConfigBean> list) throws Exception {
configView.displayConfigList(list);
}
});
}
项目:GetStartRxJava2.0
文件:UnlimitPostActivity.java
private void doRxJavaWork() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (;;) { // 无限循环发送事件
emitter.onNext(Integer.MAX_VALUE);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});
}
项目:Android-Code-Demos
文件:BasicTest.java
public Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Today's news update");
e.onNext("Today's topic is Study");
e.onComplete();
}
});
/* 下面两个方法作用类似,just 的内部调用的就是 fromArray */
// return Observable.just("Topic 1", "Heat 1", "News");
// return Observable.fromArray("Topic 1", "Heat 1", "News");
/* 只能发送一个数据 */
/*return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Topic is Study";
}
});*/
}
项目:starcraft-2-build-player
文件:StandardBuildsService.java
/**
* Returns an observable on the progress of loading stock build orders into the local SQLite DB.
* Should be scheduled on a worker thread.
*
* @param c context
* @param forceLoad if false, builds are only copied if an upgrade is required. If true,
* standard builds are always copied.
* @return observable on load progress (percentage)
*/
public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception {
try {
if (!emitter.isDisposed()) {
loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() {
@Override
public void onProgressUpdate(int percent) {
if (!emitter.isDisposed()) {
emitter.onNext(percent);
}
}
});
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
}
});
}
项目:AndroidMVPresenter
文件:RxClick.java
public static Observable<View> with(final View view) {
return Observable.create(new ObservableOnSubscribe<View>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
view.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View value) {
e.onNext(value);
}
});
}
});
}
});
}
项目:AssistantBySDK
文件:AppConfig.java
/**
* 主进程onCreate后必须进行的先始化工作
*/
public void onMainProgressCreate() {
if (!MainProgressInited) {
Log.i("LingJu", "init start>>>>>>");
Network = NetUtil.getInstance(this).getCurrentNetType();
init();
//初次定位
BaiduLocateManager.createInstance(this).addObserver(locateListener);
BaiduLocateManager.get().start(Network != NetUtil.NetType.NETWORK_TYPE_NONE);
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
initExtra();
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe();
}
}
项目:code-examples-android-expert
文件:RxJavaUnitTest.java
@Test public void test(){
Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
@Override
public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
try {
List<Todo> todos = RxJavaUnitTest.this.getTodos();
if (todos!=null){
throw new NullPointerException("todos was null");
}
for (Todo todo : todos) {
emitter.onNext(todo);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
});
TestObserver<Object> testObserver = new TestObserver<>();
todoObservable.subscribeWith(testObserver);
testObserver.assertError(NullPointerException.class);
}
项目:SAF-AOP
文件:DemoForTraceActivity.java
@Trace(enable = false)
private void initData() {
Observable.create(new ObservableOnSubscribe<String>() {
@Trace
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("111");
e.onNext("222");
e.onNext("333");
}
}).subscribe(new Consumer<String>() {
@Trace
@Override
public void accept(@NonNull String str) throws Exception {
}
});
}
项目:PXLSRT
文件:TempStorageUtils.java
public static Observable<String> saveTempBitmap(final FileOutputStream fos, final String filename, final Uri uri) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
Bitmap bitmap = BitmapFactory.decodeFile(uri.getPath());
int dimension = bitmap.getWidth();
bitmap = Bitmap.createBitmap(bitmap, 0, 0, dimension, dimension);
storeFile(fos, bitmap, 100)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
if (aBoolean) {
emitter.onNext(filename);
}
}
});
}
});
}
项目:PXLSRT
文件:TempStorageUtils.java
public static Observable<Boolean> saveResultPicture(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(final ObservableEmitter<Boolean> emitter) throws Exception {
storeFile(fos, bitmap, quality)
.subscribeOn(Schedulers.io())
.onErrorReturn(new Function<Throwable, Boolean>() {
@Override
public Boolean apply(Throwable throwable) throws Exception {
emitter.onNext(false);
return null;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
if (aBoolean) {
emitter.onNext(true);
}
}
});
}
});
}
项目:PXLSRT
文件:TempStorageUtils.java
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos);
bitmap.recycle();
try {
fos.flush();
fos.close();
emitter.onNext(true);
} catch (IOException e) {
e.printStackTrace();
emitter.onError(e);
}
}
});
}
项目:RxJava2-Android-Sample
文件:ThrottleFirstExampleActivity.java
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
项目:RxFirestore
文件:DocumentSnapshotsOnSubscribe.java
@Override
public void subscribe(final ObservableEmitter<DocumentSnapshot> emitter) throws Exception {
final EventListener<DocumentSnapshot> listener = new EventListener<DocumentSnapshot>() {
@Override
public void onEvent(DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) {
if (!emitter.isDisposed()) {
if (e == null) {
emitter.onNext(documentSnapshot);
} else {
emitter.onError(e);
}
}
}
};
registration = documentReference.addSnapshotListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
registration.remove();
}
}));
}
项目:RxJava4AndroidDemos
文件:Create.java
@Override
public void test1() {
Log.i(TAG, "test1() Create simple demo, onNext() and onError()");
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 3; i++) {
if (i == 1) {
e.onError(new IllegalStateException("Just for test"));
}
e.onNext(String.valueOf(i));
}
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
}
});
}
项目:Dalaran
文件:CourseStore.java
/**
* 查找数据
*
* @param cacheId
* @return
*/
public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) {
Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() {
@Override
public void subscribe(ObservableEmitter<Course> e) throws Exception {
Util.logMethodThreadId("findDataByIdentifier");
long time = System.currentTimeMillis();
try {
Course result = (Course) mIDBEngine.find(cacheId, Course.class);
time = System.currentTimeMillis() - time;
Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null"));
if (result != null) {
e.onNext(result);
}
e.onComplete();
} catch (XDBException d) {
e.onError(d);
}
}
});
return courseObservable;
}
项目:YiZhi
文件:GankIoCustomModel.java
@Override
public Observable<Boolean> recordItemIsRead(final String key) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
emitter.onNext(DBUtils.getDB(AppUtils.getContext()).insertRead(DBConfig
.TABLE_GANKIO_CUSTOM, key, ItemState.STATE_IS_READ));
emitter.onComplete();
}
}).compose(RxHelper.<Boolean>rxSchedulerHelper());
}
项目:CityPicker
文件:CityPresenter.java
private Observable<List<FilterBean>> getCityObserver(final String id) {
Log.d(TAG, "getCityObserver: id=" + id);
return Observable.create(new ObservableOnSubscribe<List<FilterBean>>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<List<FilterBean>> e) throws Exception {
if (cityDBUtil == null) return;
List<FilterBean> data = new ArrayList<>();
cityDBUtil.open();
try {
Cursor cursor = cityDBUtil.selectByParentId(id);
if (cursor != null && cursor.moveToFirst()) {
do {
data.add(new FilterBean(cursor.getString(0), cursor.getString(1)));
} while (cursor.moveToNext());
cursor.close();
}
} catch (Throwable t) {
Log.e(TAG, "getCityObserver: t=" + t.getMessage());
} finally {
cityDBUtil.close();
}
e.onNext(data);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
项目:GitHub
文件:RxTiPresenterUtils.java
/**
* Observable of the view state. The View is ready to receive calls after calling {@link
* TiPresenter#attachView(TiView)} and before calling {@link TiPresenter#detachView()}.
*/
public static Observable<Boolean> isViewReady(final TiPresenter presenter) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(final ObservableEmitter<Boolean> emitter)
throws Exception {
if (!emitter.isDisposed()) {
emitter.onNext(presenter.getState() == TiPresenter.State.VIEW_ATTACHED);
}
final Removable removable = presenter
.addLifecycleObserver(new TiLifecycleObserver() {
@Override
public void onChange(final TiPresenter.State state,
final boolean hasLifecycleMethodBeenCalled) {
if (!emitter.isDisposed()) {
emitter.onNext(state == TiPresenter.State.VIEW_ATTACHED
&& hasLifecycleMethodBeenCalled);
}
}
});
emitter.setDisposable(new Disposable() {
@Override
public void dispose() {
removable.remove();
}
@Override
public boolean isDisposed() {
return removable.isRemoved();
}
});
}
}).distinctUntilChanged();
}
项目:GitHub
文件:MapExampleActivity.java
private Observable<List<ApiUser>> getObservable() {
return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getApiUserList());
e.onComplete();
}
}
});
}
项目:RxJava2-Android-Sample
文件:MapExampleActivity.java
private Observable<List<ApiUser>> getObservable() {
return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getApiUserList());
e.onComplete();
}
}
});
}
项目:Assembler
文件:HotelMainDataSourceLocal.java
@Override
public Observable<String> getContent() {
return Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
emitter.onNext("酒店页面");
emitter.onComplete();
}
}
);
}
项目:GitHub
文件:DataBaseHelper.java
/**
* Read all records from database.
*
* @return All records.
*/
public Observable<List<DownloadRecord>> readAllRecords() {
return Observable
.create(new ObservableOnSubscribe<List<DownloadRecord>>() {
@Override
public void subscribe(ObservableEmitter<List<DownloadRecord>> emitter)
throws Exception {
Cursor cursor = null;
try {
cursor = getReadableDatabase().query(TABLE_NAME,
new String[]{COLUMN_ID, COLUMN_URL, COLUMN_SAVE_NAME, COLUMN_SAVE_PATH,
COLUMN_DOWNLOAD_SIZE, COLUMN_TOTAL_SIZE, COLUMN_IS_CHUNKED,
COLUMN_EXTRA1, COLUMN_EXTRA2, COLUMN_EXTRA3, COLUMN_EXTRA4,
COLUMN_EXTRA5, COLUMN_DOWNLOAD_FLAG, COLUMN_DATE, COLUMN_MISSION_ID},
null, null, null, null, null);
List<DownloadRecord> result = new ArrayList<>();
cursor.moveToFirst();
if (cursor.getCount() > 0) {
do {
result.add(read(cursor));
} while (cursor.moveToNext());
}
emitter.onNext(result);
emitter.onComplete();
} finally {
if (cursor != null) {
cursor.close();
}
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
项目:Mix
文件:MainActivity.java
private void initDemoRX02() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.e(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.e(TAG, "onNext: " + integer);
}
};
// observable.subscribe(consumer);
/*
* 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效.
多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
* */
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() {
@Override
public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
listRefs.get().acquireReference(list);
final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() {
@Override
public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new CollectionChange<>(results, changeSet));
}
}
};
list.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
list.removeChangeListener(listener);
observableRealm.close();
listRefs.get().releaseReference(list);
}
}));
// Emit current value immediately
emitter.onNext(new CollectionChange<>(list, null));
}
});
}
项目:RxSharedPreferences
文件:RxSharedPreferences.java
public Observable<Integer> getInt(final String key, final int defaultValue) {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(sharedPreferences.getInt(key, defaultValue));
e.onComplete();
}
});
}
项目:Android-Gank-Share
文件:CollectFragment.java
private void loadData(){
Observable
.create(new ObservableOnSubscribe<List<CollectItem>>() {
@Override
public void subscribe(ObservableEmitter<List<CollectItem>> e) throws Exception {
mItemList.clear();
e.onNext(DataSupport.findAll(CollectItem.class));
}
})
.map(new Function<List<CollectItem>, Boolean>() {
@Override
public Boolean apply(List<CollectItem> items) throws Exception {
return items != null && items.size() > 0
&& mItemList.addAll(items);
}
})
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
addDisposable(disposable);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
showEmptyView(!aBoolean);
mAdapter.notifyDataSetChanged();
}
});
}
项目:Mount
文件:PolicyUtils.java
public static void umountAll(final Context context, Consumer<Boolean> consumer) {
if (consumer == null) {
consumer = new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
// DO NOTHING
}
};
}
final String foregroundPackageName = getForegroundAppPackageName(context);
final List<String> foregroundServiceList = getForegroundServiceList(context);
Observable.create(
new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class);
for (PackageRecord record : list) {
if (!TextUtils.equals(record.name, foregroundPackageName)
&& !foregroundServiceList.contains(record.name)) {
setApplicationMount(context, record.name, false);
}
}
e.onNext(true);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
项目:ZgzFromWithin
文件:LocalImpl.java
@Override
public Observable<List<MonumentEntity>> monumentListEntity() {
return Observable.create(new ObservableOnSubscribe<List<MonumentEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<MonumentEntity>> e) throws Exception {
List<MonumentEntity> listPlaces = getAllPlaces();
if (listPlaces != null) {
e.onNext(listPlaces);
e.onComplete();
} else {
e.onError(new Throwable("Error getting data"));
}
}
});
}
项目:RxSharedPreferences
文件:RxSharedPreferences.java
public Observable<Long> getLong(final String key, final long defaultValue) {
return Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
e.onNext(sharedPreferences.getLong(key, defaultValue));
e.onComplete();
}
});
}