Java 类io.reactivex.ObservableOnSubscribe 实例源码
项目:RxBeacon
文件:RxBeacon.java
public Observable<RxBeaconRange> beaconsInRegion() {
return startup()
.flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
@Override
public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
beaconManager.addRangeNotifier(new RangeNotifier() {
@Override
public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
}
});
beaconManager.startRangingBeaconsInRegion(getRegion());
}
});
}
});
}
项目:GitHub
文件:ThrottleLastExampleActivity.java
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
项目:BakingApp
文件:RecipeDatabaseHelper.java
public void insertRecipes(@NonNull final ArrayList<Recipe> recipes, Observer<Integer> observer){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
ContentValues[] contentValues = new ContentValues[recipes.size()];
for (int i = 0; i < recipes.size(); ++i) {
contentValues[i] = buildContentValuesFromRecipe(recipes.get(i));
}
int recipesAdded = mContext.getContentResolver().bulkInsert(RecipesContract.RecipeEntry.CONTENT_URI, contentValues);
if (recipesAdded != 0){
e.onNext(recipesAdded);
} else {
e.onError(new NullPointerException("Failed to insert"));
}
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
项目:simple-stack
文件:TaskRepository.java
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) {
return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> {
Realm realm = Realm.getDefaultInstance();
final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm);
final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> {
if(element.isLoaded() && !emitter.isDisposed()) {
List<Task> tasks = mapFrom(element);
if(!emitter.isDisposed()) {
emitter.onNext(tasks);
}
}
};
emitter.setDisposable(Disposables.fromAction(() -> {
if(dbTasks.isValid()) {
dbTasks.removeChangeListener(realmChangeListener);
}
realm.close();
}));
dbTasks.addChangeListener(realmChangeListener);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}
项目:RxJava2-Android-Sample
文件:DebounceExampleActivity.java
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
//1(drop)--(400s<500s)---2(pass)---(600s>500s)---3(drop)---(100s<500s)---4(pass)---(605s>500s)---5(pass)---510s
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(600);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
项目:MBEStyle
文件:IconShowPresenter.java
public Disposable getAllIcons() {
return Observable.create(new ObservableOnSubscribe<IconBean>() {
@Override
public void subscribe(@NonNull ObservableEmitter<IconBean> e) throws Exception {
XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
if (xml.getEventType() == XmlPullParser.START_TAG) {
if (xml.getName().startsWith("item")) {
IconBean bean = new IconBean();
String iconName = xml.getAttributeValue(null, "drawable");
bean.id = mView.getResources().getIdentifier(
iconName, "drawable", BuildConfig.APPLICATION_ID);
bean.name = iconName;
e.onNext(bean);
}
}
xml.next();
}
e.onComplete();
}
}).toList().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<IconBean>>() {
@Override
public void accept(List<IconBean> list) throws Exception {
mView.onLoadData(list);
}
});
}
项目:Rxjava2.0Demo
文件:MapActivity.java
private void map() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName());
info += s + "\n";
tv.setText(info);
}
});
}
项目:OKHttpLoggingInterceptor
文件:NetTest.java
public void test()
{
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception
{
e.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception
{
System.out.println(integer);
}
});
}
项目:GetStartRxJava2.0
文件:UnlimitPostActivity.java
private void doRxJavaWork() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (;;) { // 无限循环发送事件
emitter.onNext(Integer.MAX_VALUE);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});
}
项目:Android-Code-Demos
文件:BasicTest.java
public Observable<String> getObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Today's news update");
e.onNext("Today's topic is Study");
e.onComplete();
}
});
/* 下面两个方法作用类似,just 的内部调用的就是 fromArray */
// return Observable.just("Topic 1", "Heat 1", "News");
// return Observable.fromArray("Topic 1", "Heat 1", "News");
/* 只能发送一个数据 */
/*return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Topic is Study";
}
});*/
}
项目:NeteaseCloudMusic
文件:ConfigPresenter.java
public void requestLoadingList() {
Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() {
@Override
public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception {
mModel = ConfigModel.getInstance(configView.getContext());
e.onNext(mModel.getConfigList());
mModel.setConfigCallback(ConfigPresenter.this);
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ConfigBean>>() {
@Override
public void accept(List<ConfigBean> list) throws Exception {
configView.displayConfigList(list);
}
});
}
项目:AndroidMVPresenter
文件:RxClick.java
public static Observable<View> with(final View view) {
return Observable.create(new ObservableOnSubscribe<View>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception {
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
view.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View value) {
e.onNext(value);
}
});
}
});
}
});
}
项目:AssistantBySDK
文件:TingPlayProcessor.java
/**
* 按一级分类查找专辑
**/
private Observable<List<Album>> getAlbumByCate(String cateId, int calc_dimension) {
final Map<String, String> params = new HashMap<>();
params.put(DTransferConstants.CATEGORY_ID, cateId);
params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
return Observable.create(new ObservableOnSubscribe<List<Album>>() {
@Override
public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
CommonRequest.getAlbumList(params, new IDataCallBack<AlbumList>() {
@Override
public void onSuccess(AlbumList albumList) {
//onNext的参数不允许为null
e.onNext(albumList.getAlbums());
e.onComplete();
}
@Override
public void onError(int i, String s) {
e.onError(new Throwable(i + " " + s));
}
});
}
}).subscribeOn(Schedulers.io());
}
项目:AssistantBySDK
文件:TingPlayProcessor.java
/**
* 按关键词查找专辑
**/
private Observable<List<Album>> getAlbumByKeyWord(String keyword, String cateId, int calc_dimension) {
final Map<String, String> params = new HashMap<>();
params.put(DTransferConstants.SEARCH_KEY, keyword);
params.put(DTransferConstants.CATEGORY_ID, cateId);
params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension));
return Observable.create(new ObservableOnSubscribe<List<Album>>() {
@Override
public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception {
CommonRequest.getSearchedAlbums(params, new IDataCallBack<SearchAlbumList>() {
@Override
public void onSuccess(SearchAlbumList searchAlbumList) {
e.onNext(searchAlbumList.getAlbums());
e.onComplete();
}
@Override
public void onError(int i, String s) {
e.onError(new Throwable(i + " " + s));
}
});
}
})
.subscribeOn(Schedulers.io());
}
项目:starcraft-2-build-player
文件:StandardBuildsService.java
/**
* Returns an observable on the progress of loading stock build orders into the local SQLite DB.
* Should be scheduled on a worker thread.
*
* @param c context
* @param forceLoad if false, builds are only copied if an upgrade is required. If true,
* standard builds are always copied.
* @return observable on load progress (percentage)
*/
public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception {
try {
if (!emitter.isDisposed()) {
loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() {
@Override
public void onProgressUpdate(int percent) {
if (!emitter.isDisposed()) {
emitter.onNext(percent);
}
}
});
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
}
});
}
项目:ReactiveAirplaneMode
文件:ReactiveAirplaneMode.java
/**
* Observes Airplane Mode state of the device with BroadcastReceiver.
* RxJava2 Observable emits true if the airplane mode turns on and false otherwise.
*
* @param context of the Application or Activity
* @return RxJava2 Observable with Boolean value indicating state of the airplane mode
*/
public Observable<Boolean> observe(final Context context) {
checkContextIsNotNull(context);
final IntentFilter filter = createIntentFilter();
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter)
throws Exception {
final BroadcastReceiver receiver = createBroadcastReceiver(emitter);
context.registerReceiver(receiver, filter);
final Disposable disposable = disposeInUiThread(new Action() {
@Override public void run() throws Exception {
tryToUnregisterReceiver(receiver, context);
}
});
emitter.setDisposable(disposable);
}
});
}
项目:AssistantBySDK
文件:AccountingActivity.java
/**
* 更新余额、当日收支
**/
public void updateBalance(final int type, final List<TaskCard<Accounting>> taskcards) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
CountToday();
CountBalance(type, taskcards);
e.onNext(0);
}
})
.subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程
.observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
if (AppConfig.dPreferences.getBoolean(AppConfig.HAS_AMOUNT, false))
mTvBalance.setText("¥" + AssistUtils.formatAmount(balance));
mAdapter.notifyItemChanged(0);
}
});
}
项目:RxGps
文件:RxGps.java
private Observable<Boolean> checkPlayServicesAvailable() {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
final Activity activity = activityReference.get();
if (activity != null) {
final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance();
final int status = apiAvailability.isGooglePlayServicesAvailable(activity);
if (status != ConnectionResult.SUCCESS) {
e.onError(new PlayServicesNotAvailableException());
} else {
e.onNext(true);
e.onComplete();
}
}
}
});
}
项目:code-examples-android-expert
文件:RxJavaUnitTest.java
@Test public void test(){
Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
@Override
public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
try {
List<Todo> todos = RxJavaUnitTest.this.getTodos();
if (todos!=null){
throw new NullPointerException("todos was null");
}
for (Todo todo : todos) {
emitter.onNext(todo);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
});
TestObserver<Object> testObserver = new TestObserver<>();
todoObservable.subscribeWith(testObserver);
testObserver.assertError(NullPointerException.class);
}
项目:SAF-AOP
文件:DemoForTraceActivity.java
@Trace(enable = false)
private void initData() {
Observable.create(new ObservableOnSubscribe<String>() {
@Trace
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("111");
e.onNext("222");
e.onNext("333");
}
}).subscribe(new Consumer<String>() {
@Trace
@Override
public void accept(@NonNull String str) throws Exception {
}
});
}
项目:simple-stack
文件:DatabaseManager.java
public void openDatabase() {
disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmChangeListener<Realm> listener = realm -> {
if(!emitter.isDisposed()) {
emitter.onNext(observableRealm);
}
};
observableRealm.addChangeListener(listener);
emitter.setDisposable(Disposables.fromAction(() -> {
observableRealm.removeChangeListener(listener);
observableRealm.close();
}));
emitter.onNext(observableRealm);
}).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
项目:RxRetroJsoup
文件:RxJsoup.java
public Observable<String> text(final Element element, final String expression) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
final Elements elements = element.select(expression);
if (elements.isEmpty() && exceptionIfNotFound) {
observableEmitter.onError(new NotFoundException(expression, element.toString()));
} else {
if (elements.isEmpty()) {
observableEmitter.onNext("");
} else {
for (Element e : elements) {
observableEmitter.onNext(e.text());
}
}
observableEmitter.onComplete();
}
}
});
}
项目:RxJava4AndroidDemos
文件:Create.java
@Override
public void test3() {
Log.i(TAG, "test3() Create simple demo, onNext() twice");
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 3; i++) {
e.onNext(String.valueOf(i));
}
}
});
for (int time = 0; time < 2; time++) {
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
});
}
}
项目:Reactive-Programming-With-Java-9
文件:DemoObservable.java
public static void main(String[] args) {
Observable<String> month_observable = Observable.create(new
ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
// TODO Auto-generated method stub
try {
String[] monthArray = { "Jan", "Feb", "Mar",
"Apl", "May", "Jun", "July", "Aug",
"Sept", "Oct","Nov", "Dec" };
List<String> months = Arrays.asList(monthArray);
for (String month : months) {
emitter.onNext(month);
}
emitter.onComplete();
} catch (Exception e) {
// TODO: handle exception
emitter.onError(e);
}
}
});
month_observable.subscribe(s -> System.out.println(s));
}
项目:RxJava2-Android-Sample
文件:ThrottleLastExampleActivity.java
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
项目:PXLSRT
文件:TempStorageUtils.java
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos);
bitmap.recycle();
try {
fos.flush();
fos.close();
emitter.onNext(true);
} catch (IOException e) {
e.printStackTrace();
emitter.onError(e);
}
}
});
}
项目:Aurora
文件:HistoryModel.java
@Override
public Observable<List<VideoDaoEntity>> getListFromNet(int start, String userid) {
return Observable.create((ObservableOnSubscribe<List<VideoDaoEntity>>) emitter -> {
BmobQuery<VideoDaoEntity> query = new BmobQuery<VideoDaoEntity>();
query.addWhereEqualTo("userId", userid);
query.setLimit(10);
query.order("-updatedAt");
query.setSkip(start);
query.findObjects(new FindListener<VideoDaoEntity>() {
@Override
public void done(List<VideoDaoEntity> list, BmobException e) {
List<VideoDaoEntity> infolist = new ArrayList<VideoDaoEntity>();
if (!StringUtils.isEmpty(list)) {
for (VideoDaoEntity entity1 : list) {
entity1.setVideo(mGson.fromJson(entity1.getBody(), VideoListInfo.Video.VideoData.class));
infolist.add(entity1);
}
}
emitter.onNext(infolist);
}
});
});
}
项目:Mount
文件:MountReceiver.java
private void onActionPackageFullyRemoved(final Intent intent) {
Observable.create(
new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
// prefix "package:"
String packageName = intent.getData().toString().substring(8);
List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class);
for (PackageRecord record : list) {
if (TextUtils.equals(record.name, packageName)) {
record.delete();
}
}
e.onNext(true);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe();
}
项目:android-study
文件:RxJavaFragment.java
/**
* sample操作符每隔指定的时间就从上游中取出一个事件发送给下游.
*/
private void doSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) { //模拟无限循环发送事件
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});
}
项目:Dalaran
文件:CourseStore.java
/**
* 查找数据
*
* @param cacheId
* @return
*/
public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) {
Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() {
@Override
public void subscribe(ObservableEmitter<Course> e) throws Exception {
Util.logMethodThreadId("findDataByIdentifier");
long time = System.currentTimeMillis();
try {
Course result = (Course) mIDBEngine.find(cacheId, Course.class);
time = System.currentTimeMillis() - time;
Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null"));
if (result != null) {
e.onNext(result);
}
e.onComplete();
} catch (XDBException d) {
e.onError(d);
}
}
});
return courseObservable;
}
项目:RxEasyHttp
文件:MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
FileUtils.getFileFromAsset(MainActivity.this, "1.jpg");
}
}).compose(RxUtil.<String>io_main()).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
}
});
}
项目:Reactive-Programming-With-Java-9
文件:Demo_Schedulers_IO.java
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// TODO Auto-generated method stub
System.out.println("Thread:-"+Thread.currentThread().getName());
emitter.onNext(getNum());
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
// TODO Auto-generated method stub
System.out.println("Thread for subscription:-"+Thread.currentThread().getName());
System.out.println(value);
}
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("end of main:-"+Thread.currentThread().getName());
}
项目:Aurora
文件:HistoryModel.java
@Override
public Observable<Boolean> deleteFromNet(VideoDaoEntity entity) {
return Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
entity.delete(new UpdateListener() {
@Override
public void done(BmobException e) {
if (e == null) {
emitter.onNext(true);
}
}
});
});
}
项目:RxSharedPreferences
文件:RxSharedPreferences.java
public Observable<Integer> getInt(final String key, final int defaultValue) {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(sharedPreferences.getInt(key, defaultValue));
e.onComplete();
}
});
}
项目:geotemporal
文件:BTree.java
private Observable<Value> range(Node<Key, Value> x, Key lowerInclusive, Key upperExclusive,
int height) {
return Observable.create(new ObservableOnSubscribe<Value>() {
@Override
public void subscribe(ObservableEmitter<Value> emitter) throws Exception {
range(x, lowerInclusive, upperExclusive, height, emitter);
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
});
}
项目:GitHub
文件:MapExampleActivity.java
private Observable<List<ApiUser>> getObservable() {
return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getApiUserList());
e.onComplete();
}
}
});
}
项目:screen-share-to-browser
文件:RecordService.java
private Observable<ImageInfo> getByteBufferObservable() {
return Observable.create(new ObservableOnSubscribe<ImageInfo>() {
@Override
public void subscribe(ObservableEmitter<ImageInfo> e) throws Exception {
imageInfoObservableEmitter = e;
Log.d(TAG, "subscribe: " + Process.myTid());
}
});
}
项目:GitHub
文件:ZipExampleActivity.java
private Observable<List<User>> getFootballFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesFootball());
e.onComplete();
}
}
});
}
项目:Assembler
文件:HotelMainDataSourceLocal.java
@Override
public Observable<String> getContent() {
return Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter)
throws Exception {
emitter.onNext("酒店页面");
emitter.onComplete();
}
}
);
}
项目:GitHub
文件:RealmObservableFactory.java
@Override
public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() {
@Override
public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
resultsRefs.get().acquireReference(results);
final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() {
@Override
public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) {
if (!emitter.isDisposed()) {
emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet));
}
}
};
results.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
results.removeChangeListener(listener);
observableRealm.close();
resultsRefs.get().releaseReference(results);
}
}));
// Emit current value immediately
emitter.onNext(new CollectionChange<>(results, null));
}
});
}