Java 类io.reactivex.ObservableEmitter 实例源码

项目:Rx_java2_soussidev    文件:LollipopNetworkObservingStrategy.java   
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
    final String service = Context.CONNECTIVITY_SERVICE;
    final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);

    return Observable.create(new ObservableOnSubscribe<Connectivity>() {
        @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
            networkCallback = createNetworkCallback(subscriber, context);
            final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
            manager.registerNetworkCallback(networkRequest, networkCallback);
        }
    }).doOnDispose(new Action() {
        @Override public void run() {
            tryToUnregisterCallback(manager);
        }
    }).startWith(Connectivity.create(context)).distinctUntilChanged();
}
项目:GitHub    文件:DebounceExampleActivity.java   
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            emitter.onNext(1); // skip
            Thread.sleep(400);
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(100);
            emitter.onNext(4); // deliver
            Thread.sleep(605);
            emitter.onNext(5); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
项目:GitHub    文件:ThrottleLastExampleActivity.java   
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
项目:ReactiveAirplaneMode    文件:ReactiveAirplaneMode.java   
/**
 * Observes Airplane Mode state of the device with BroadcastReceiver.
 * RxJava2 Observable emits true if the airplane mode turns on and false otherwise.
 *
 * @param context of the Application or Activity
 * @return RxJava2 Observable with Boolean value indicating state of the airplane mode
 */
public Observable<Boolean> observe(final Context context) {
  checkContextIsNotNull(context);
  final IntentFilter filter = createIntentFilter();

  return Observable.create(new ObservableOnSubscribe<Boolean>() {
    @Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter)
        throws Exception {
      final BroadcastReceiver receiver = createBroadcastReceiver(emitter);
      context.registerReceiver(receiver, filter);

      final Disposable disposable = disposeInUiThread(new Action() {
        @Override public void run() throws Exception {
          tryToUnregisterReceiver(receiver, context);
        }
      });

      emitter.setDisposable(disposable);
    }
  });
}
项目:dagger-test-example    文件:ImageRequestManager.java   
private SimpleTarget<Bitmap> getInto(String iconUrl, final ObservableEmitter<Bitmap> emitter) {
    return requestManager.load(Uri.parse(iconUrl)).asBitmap().into(new SimpleTarget<Bitmap>()
    {
        @Override
        public void onResourceReady(Bitmap resource, GlideAnimation<? super Bitmap> glideAnimation)
        {
            emitter.onNext(resource);
            emitter.onComplete();
        }

        @Override
        public void onLoadFailed(Exception e, Drawable errorDrawable) {
            super.onLoadFailed(e, errorDrawable);
            emitter.onError(e);
        }
    });
}
项目:RxEasyHttp    文件:MainActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            FileUtils.getFileFromAsset(MainActivity.this, "1.jpg");
        }
    }).compose(RxUtil.<String>io_main()).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {

        }
    });
}
项目:YiZhi    文件:HeadSettingActivity.java   
@OnClick({R.id.tv_cancel, R.id.tv_ok})
public void onClick(View view) {
    switch (view.getId()) {
        case R.id.tv_cancel:
            onBackPressedSupport();
            break;
        case R.id.tv_ok:
            Observable.create(new ObservableOnSubscribe<Uri>() {
                @Override
                public void subscribe(ObservableEmitter<Uri> e) throws
                        Exception {
                    e.onNext(generateUri());
                    e.onComplete();
                }
            }).compose(RxHelper.<Uri>rxSchedulerHelper())
                    .subscribe(new Consumer<Uri>() {
                        @Override
                        public void accept(Uri uri) throws Exception {
                            RxEventHeadBean rxEventHeadBean = new RxEventHeadBean(uri);
                            RxBus.get().send(RX_BUS_CODE_HEAD_IMAGE_URI, rxEventHeadBean);
                            onBackPressedSupport();
                        }
                    });
            break;
    }
}
项目:Assembler    文件:_Router.java   
@Override
public boolean saveData(final Object obj) {
    if (null == obj) {
        return false;
    }
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext(new Gson().toJson(obj));
            e.onComplete();
        }
    });

    RouterWare.observerables.put(routerRequest.getUri().toString(), observable);
    return true;
}
项目:ObjectBoxRxJava    文件:RxBoxStore.java   
/**
 * Using the returned Observable, you can be notified about data changes.
 * Once a transaction is committed, you will get info on classes with changed Objects.
 */
public static <T> Observable<Class> observable(final BoxStore boxStore) {
    return Observable.create(new ObservableOnSubscribe<Class>() {
        @Override
        public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
            final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
                @Override
                public void onData(Class data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
项目:Rxjava2.0Demo    文件:MapActivity.java   
private void map() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName());
            info += s + "\n";
            tv.setText(info);
        }
    });
}
项目:BakingApp    文件:RecipeDatabaseHelper.java   
public void getAllRecipes(Observer<ArrayList<Recipe>> observer){
    Observable.create(new ObservableOnSubscribe<ArrayList<Recipe>>() {
        @Override
        public void subscribe(ObservableEmitter<ArrayList<Recipe>> e) throws Exception {
            Cursor cursor = mContext.getContentResolver()
                    .query(RecipesContract.RecipeEntry.CONTENT_URI,
                            null,
                            null,
                            null,
                            RecipesContract.RecipeEntry.COLUMN_NAME + " DESC");
            ArrayList<Recipe> recipes;
            if(cursor != null){
                recipes = buildRecipesFromCursor(cursor);
                cursor.close();
                e.onNext(recipes);
            } else {
                e.onError(new NullPointerException("Recipe cursor is null."));
            }
            e.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
}
项目:NeteaseCloudMusic    文件:ConfigPresenter.java   
public void requestLoadingList() {
    Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() {
        @Override
        public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception {
            mModel = ConfigModel.getInstance(configView.getContext());
            e.onNext(mModel.getConfigList());
            mModel.setConfigCallback(ConfigPresenter.this);
        }
    })
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<ConfigBean>>() {
                @Override
                public void accept(List<ConfigBean> list) throws Exception {
                    configView.displayConfigList(list);
                }
            });

}
项目:GetStartRxJava2.0    文件:UnlimitPostActivity.java   
private void doRxJavaWork() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (;;) { // 无限循环发送事件
                emitter.onNext(Integer.MAX_VALUE);
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "" + integer);
                }
            });
}
项目:Android-Code-Demos    文件:BasicTest.java   
public Observable<String> getObservable() {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Today's news update");
                e.onNext("Today's topic is Study");
                e.onComplete();
            }
        });
        /* 下面两个方法作用类似,just 的内部调用的就是 fromArray */
//     return Observable.just("Topic 1", "Heat 1", "News");
//     return Observable.fromArray("Topic 1", "Heat 1", "News");
        /* 只能发送一个数据 */
        /*return Observable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Topic is Study";
            }
        });*/
    }
项目:starcraft-2-build-player    文件:StandardBuildsService.java   
/**
 * Returns an observable on the progress of loading stock build orders into the local SQLite DB.
 * Should be scheduled on a worker thread.
 *
 * @param c context
 * @param forceLoad if false, builds are only copied if an upgrade is required. If true,
 *                  standard builds are always copied.
 * @return observable on load progress (percentage)
 */
public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception {
            try {
                if (!emitter.isDisposed()) {
                    loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() {
                        @Override
                        public void onProgressUpdate(int percent) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(percent);
                            }
                        }
                    });
                    emitter.onComplete();
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    });
}
项目:AndroidMVPresenter    文件:RxClick.java   
public static Observable<View> with(final View view) {
    return Observable.create(new ObservableOnSubscribe<View>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception {
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    view.setOnClickListener(new View.OnClickListener() {
                        @Override
                        public void onClick(View value) {
                            e.onNext(value);
                        }
                    });
                }
            });
        }
    });
}
项目:AssistantBySDK    文件:AppConfig.java   
/**
 * 主进程onCreate后必须进行的先始化工作
 */
public void onMainProgressCreate() {
    if (!MainProgressInited) {
        Log.i("LingJu", "init start>>>>>>");
        Network = NetUtil.getInstance(this).getCurrentNetType();
        init();
        //初次定位
        BaiduLocateManager.createInstance(this).addObserver(locateListener);
        BaiduLocateManager.get().start(Network != NetUtil.NetType.NETWORK_TYPE_NONE);
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                initExtra();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe();
    }
}
项目:code-examples-android-expert    文件:RxJavaUnitTest.java   
@Test public void test(){
    Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
        @Override
        public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
            try {
                List<Todo> todos = RxJavaUnitTest.this.getTodos();
                if (todos!=null){
                    throw new NullPointerException("todos was null");
                }
                for (Todo todo : todos) {
                    emitter.onNext(todo);
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    });
    TestObserver<Object> testObserver = new TestObserver<>();
    todoObservable.subscribeWith(testObserver);
    testObserver.assertError(NullPointerException.class);

}
项目:SAF-AOP    文件:DemoForTraceActivity.java   
@Trace(enable = false)
private void initData() {

    Observable.create(new ObservableOnSubscribe<String>() {

        @Trace
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {

            e.onNext("111");
            e.onNext("222");
            e.onNext("333");

        }
    }).subscribe(new Consumer<String>() {

        @Trace
        @Override
        public void accept(@NonNull String str) throws Exception {

        }
    });
}
项目:PXLSRT    文件:TempStorageUtils.java   
public static Observable<String> saveTempBitmap(final FileOutputStream fos, final String filename, final Uri uri) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
            Bitmap bitmap = BitmapFactory.decodeFile(uri.getPath());
            int dimension = bitmap.getWidth();
            bitmap = Bitmap.createBitmap(bitmap, 0, 0, dimension, dimension);
            storeFile(fos, bitmap, 100)
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            if (aBoolean) {
                                emitter.onNext(filename);
                            }
                        }
                    });
        }
    });
}
项目:PXLSRT    文件:TempStorageUtils.java   
public static Observable<Boolean> saveResultPicture(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final ObservableEmitter<Boolean> emitter) throws Exception {
            storeFile(fos, bitmap, quality)
                    .subscribeOn(Schedulers.io())
                    .onErrorReturn(new Function<Throwable, Boolean>() {
                        @Override
                        public Boolean apply(Throwable throwable) throws Exception {
                            emitter.onNext(false);
                            return null;
                        }
                    })
                    .subscribe(new Consumer<Boolean>() {
                        @Override
                        public void accept(Boolean aBoolean) throws Exception {
                            if (aBoolean) {
                                emitter.onNext(true);
                            }
                        }
                    });
        }
    });
}
项目:PXLSRT    文件:TempStorageUtils.java   
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos);
            bitmap.recycle();
            try {
                fos.flush();
                fos.close();
                emitter.onNext(true);
            } catch (IOException e) {
                e.printStackTrace();
                emitter.onError(e);
            }
        }
    });
}
项目:RxJava2-Android-Sample    文件:ThrottleFirstExampleActivity.java   
private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}
项目:RxFirestore    文件:DocumentSnapshotsOnSubscribe.java   
@Override
public void subscribe(final ObservableEmitter<DocumentSnapshot> emitter) throws Exception {
    final EventListener<DocumentSnapshot> listener = new EventListener<DocumentSnapshot>() {
        @Override
        public void onEvent(DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) {
            if (!emitter.isDisposed()) {
                if (e == null) {
                    emitter.onNext(documentSnapshot);
                } else {
                    emitter.onError(e);
                }
            }
        }

    };

    registration = documentReference.addSnapshotListener(listener);

    emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            registration.remove();
        }
    }));
}
项目:RxJava4AndroidDemos    文件:Create.java   
@Override
public void test1() {
    Log.i(TAG, "test1() Create simple demo, onNext() and onError()");
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            for (int i = 0; i < 3; i++) {
                if (i == 1) {
                    e.onError(new IllegalStateException("Just for test"));
                }
                e.onNext(String.valueOf(i));
            }
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "Consumer<String> accept() s: " + s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
        }
    });
}
项目:Dalaran    文件:CourseStore.java   
/**
 * 查找数据
 *
 * @param cacheId
 * @return
 */
public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) {
    Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() {
        @Override
        public void subscribe(ObservableEmitter<Course> e) throws Exception {
            Util.logMethodThreadId("findDataByIdentifier");
            long time = System.currentTimeMillis();
            try {
                Course result = (Course) mIDBEngine.find(cacheId, Course.class);
                time = System.currentTimeMillis() - time;
                Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null"));
                if (result != null) {
                    e.onNext(result);
                }
                e.onComplete();
            } catch (XDBException d) {
                e.onError(d);
            }
        }
    });
    return courseObservable;

}
项目:YiZhi    文件:GankIoCustomModel.java   
@Override
public Observable<Boolean> recordItemIsRead(final String key) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            emitter.onNext(DBUtils.getDB(AppUtils.getContext()).insertRead(DBConfig
                    .TABLE_GANKIO_CUSTOM, key, ItemState.STATE_IS_READ));
            emitter.onComplete();
        }
    }).compose(RxHelper.<Boolean>rxSchedulerHelper());
}
项目:CityPicker    文件:CityPresenter.java   
private Observable<List<FilterBean>> getCityObserver(final String id) {
    Log.d(TAG, "getCityObserver: id=" + id);

    return Observable.create(new ObservableOnSubscribe<List<FilterBean>>() {
        @Override
        public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<List<FilterBean>> e) throws Exception {
            if (cityDBUtil == null) return;

            List<FilterBean> data = new ArrayList<>();
            cityDBUtil.open();
            try {
                Cursor cursor = cityDBUtil.selectByParentId(id);

                if (cursor != null && cursor.moveToFirst()) {
                    do {
                        data.add(new FilterBean(cursor.getString(0), cursor.getString(1)));
                    } while (cursor.moveToNext());
                    cursor.close();
                }
            } catch (Throwable t) {
                Log.e(TAG, "getCityObserver: t=" + t.getMessage());
            } finally {
                cityDBUtil.close();
            }
            e.onNext(data);
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
项目:GitHub    文件:RxTiPresenterUtils.java   
/**
 * Observable of the view state. The View is ready to receive calls after calling {@link
 * TiPresenter#attachView(TiView)} and before calling {@link TiPresenter#detachView()}.
 */
public static Observable<Boolean> isViewReady(final TiPresenter presenter) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final ObservableEmitter<Boolean> emitter)
                throws Exception {
            if (!emitter.isDisposed()) {
                emitter.onNext(presenter.getState() == TiPresenter.State.VIEW_ATTACHED);
            }

            final Removable removable = presenter
                    .addLifecycleObserver(new TiLifecycleObserver() {
                        @Override
                        public void onChange(final TiPresenter.State state,
                                final boolean hasLifecycleMethodBeenCalled) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(state == TiPresenter.State.VIEW_ATTACHED
                                        && hasLifecycleMethodBeenCalled);
                            }
                        }
                    });

            emitter.setDisposable(new Disposable() {
                @Override
                public void dispose() {
                    removable.remove();
                }

                @Override
                public boolean isDisposed() {
                    return removable.isRemoved();
                }
            });
        }
    }).distinctUntilChanged();
}
项目:GitHub    文件:MapExampleActivity.java   
private Observable<List<ApiUser>> getObservable() {
    return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
        @Override
        public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
            if (!e.isDisposed()) {
                e.onNext(Utils.getApiUserList());
                e.onComplete();
            }
        }
    });
}
项目:RxJava2-Android-Sample    文件:MapExampleActivity.java   
private Observable<List<ApiUser>> getObservable() {
    return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
        @Override
        public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
            if (!e.isDisposed()) {
                e.onNext(Utils.getApiUserList());
                e.onComplete();
            }
        }
    });
}
项目:Assembler    文件:HotelMainDataSourceLocal.java   
@Override
public Observable<String> getContent() {
    return Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter)
                throws Exception {
                emitter.onNext("酒店页面");
                emitter.onComplete();
            }
        }
    );
}
项目:GitHub    文件:DataBaseHelper.java   
/**
 * Read all records from database.
 *
 * @return All records.
 */
public Observable<List<DownloadRecord>> readAllRecords() {
    return Observable
            .create(new ObservableOnSubscribe<List<DownloadRecord>>() {
                @Override
                public void subscribe(ObservableEmitter<List<DownloadRecord>> emitter)
                        throws Exception {
                    Cursor cursor = null;
                    try {
                        cursor = getReadableDatabase().query(TABLE_NAME,
                                new String[]{COLUMN_ID, COLUMN_URL, COLUMN_SAVE_NAME, COLUMN_SAVE_PATH,
                                        COLUMN_DOWNLOAD_SIZE, COLUMN_TOTAL_SIZE, COLUMN_IS_CHUNKED,
                                        COLUMN_EXTRA1, COLUMN_EXTRA2, COLUMN_EXTRA3, COLUMN_EXTRA4,
                                        COLUMN_EXTRA5, COLUMN_DOWNLOAD_FLAG, COLUMN_DATE, COLUMN_MISSION_ID},
                                null, null, null, null, null);
                        List<DownloadRecord> result = new ArrayList<>();
                        cursor.moveToFirst();
                        if (cursor.getCount() > 0) {
                            do {
                                result.add(read(cursor));
                            } while (cursor.moveToNext());
                        }
                        emitter.onNext(result);
                        emitter.onComplete();
                    } finally {
                        if (cursor != null) {
                            cursor.close();
                        }
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
项目:Mix    文件:MainActivity.java   
private void initDemoRX02() {
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                Log.e(TAG, "emit 1");
                emitter.onNext(1);
            }
        });

        Consumer<Integer> consumer = new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());
                Log.e(TAG, "onNext: " + integer);
            }
        };

//        observable.subscribe(consumer);

        /*
        * 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效.
          多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
        * */
        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);
    }
项目:GitHub    文件:RealmObservableFactory.java   
@Override
public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() {
        @Override
        public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            listRefs.get().acquireReference(list);
            final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() {
                @Override
                public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(new CollectionChange<>(results, changeSet));
                    }
                }
            };
            list.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    list.removeChangeListener(listener);
                    observableRealm.close();
                    listRefs.get().releaseReference(list);
                }
            }));

            // Emit current value immediately
            emitter.onNext(new CollectionChange<>(list, null));
        }
    });
}
项目:RxSharedPreferences    文件:RxSharedPreferences.java   
public Observable<Integer> getInt(final String key, final int defaultValue) {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
            e.onNext(sharedPreferences.getInt(key, defaultValue));
            e.onComplete();
        }
    });
}
项目:Android-Gank-Share    文件:CollectFragment.java   
private void loadData(){
    Observable
            .create(new ObservableOnSubscribe<List<CollectItem>>() {
                @Override
                public void subscribe(ObservableEmitter<List<CollectItem>> e) throws Exception {
                    mItemList.clear();
                    e.onNext(DataSupport.findAll(CollectItem.class));
                }
            })
            .map(new Function<List<CollectItem>, Boolean>() {
                @Override
                public Boolean apply(List<CollectItem> items) throws Exception {
                    return items != null && items.size() > 0
                            && mItemList.addAll(items);
                }
            })
            .subscribeOn(Schedulers.io())
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    addDisposable(disposable);
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    showEmptyView(!aBoolean);
                    mAdapter.notifyDataSetChanged();
                }
            });
}
项目:Mount    文件:PolicyUtils.java   
public static void umountAll(final Context context, Consumer<Boolean> consumer) {
    if (consumer == null) {
        consumer = new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                // DO NOTHING
            }
        };
    }

    final String foregroundPackageName = getForegroundAppPackageName(context);
    final List<String> foregroundServiceList = getForegroundServiceList(context);
    Observable.create(
            new ObservableOnSubscribe<Boolean>() {
                @Override
                public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
                    List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class);
                    for (PackageRecord record : list) {
                        if (!TextUtils.equals(record.name, foregroundPackageName)
                                && !foregroundServiceList.contains(record.name)) {
                            setApplicationMount(context, record.name, false);
                        }
                    }

                    e.onNext(true);
                    e.onComplete();
                }
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);
}
项目:ZgzFromWithin    文件:LocalImpl.java   
@Override
public Observable<List<MonumentEntity>> monumentListEntity() {
    return Observable.create(new ObservableOnSubscribe<List<MonumentEntity>>() {
        @Override
        public void subscribe(ObservableEmitter<List<MonumentEntity>> e) throws Exception {
            List<MonumentEntity> listPlaces = getAllPlaces();
            if (listPlaces != null) {
                e.onNext(listPlaces);
                e.onComplete();
            } else {
                e.onError(new Throwable("Error getting data"));
            }
        }
    });
}
项目:RxSharedPreferences    文件:RxSharedPreferences.java   
public Observable<Long> getLong(final String key, final long defaultValue) {
    return Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
            e.onNext(sharedPreferences.getLong(key, defaultValue));
            e.onComplete();
        }
    });
}