public Observable<RxBeaconRange> beaconsInRegion() { return startup() .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() { @Override public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception { return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() { @Override public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception { beaconManager.addRangeNotifier(new RangeNotifier() { @Override public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) { objectObservableEmitter.onNext(new RxBeaconRange(collection, region)); } }); beaconManager.startRangingBeaconsInRegion(getRegion()); } }); } }); }
private Observable<Integer> getObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // send events with simulated time wait Thread.sleep(0); emitter.onNext(1); // skip emitter.onNext(2); // deliver Thread.sleep(505); emitter.onNext(3); // skip Thread.sleep(99); emitter.onNext(4); // skip Thread.sleep(100); emitter.onNext(5); // skip emitter.onNext(6); // deliver Thread.sleep(305); emitter.onNext(7); // deliver Thread.sleep(510); emitter.onComplete(); } }); }
public void insertRecipes(@NonNull final ArrayList<Recipe> recipes, Observer<Integer> observer){ Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { ContentValues[] contentValues = new ContentValues[recipes.size()]; for (int i = 0; i < recipes.size(); ++i) { contentValues[i] = buildContentValuesFromRecipe(recipes.get(i)); } int recipesAdded = mContext.getContentResolver().bulkInsert(RecipesContract.RecipeEntry.CONTENT_URI, contentValues); if (recipesAdded != 0){ e.onNext(recipesAdded); } else { e.onError(new NullPointerException("Failed to insert")); } e.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); }
private Observable<List<Task>> createResults(QuerySelector<DbTask> querySelector) { return Observable.create((ObservableOnSubscribe<List<Task>>) emitter -> { Realm realm = Realm.getDefaultInstance(); final RealmResults<DbTask> dbTasks = querySelector.createQuery(realm); final RealmChangeListener<RealmResults<DbTask>> realmChangeListener = element -> { if(element.isLoaded() && !emitter.isDisposed()) { List<Task> tasks = mapFrom(element); if(!emitter.isDisposed()) { emitter.onNext(tasks); } } }; emitter.setDisposable(Disposables.fromAction(() -> { if(dbTasks.isValid()) { dbTasks.removeChangeListener(realmChangeListener); } realm.close(); })); dbTasks.addChangeListener(realmChangeListener); }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()); }
private Observable<Integer> getObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // send events with simulated time wait //1(drop)--(400s<500s)---2(pass)---(600s>500s)---3(drop)---(100s<500s)---4(pass)---(605s>500s)---5(pass)---510s emitter.onNext(1); // skip Thread.sleep(400); emitter.onNext(2); // deliver Thread.sleep(600); emitter.onNext(3); // skip Thread.sleep(100); emitter.onNext(4); // deliver Thread.sleep(605); emitter.onNext(5); // deliver Thread.sleep(510); emitter.onComplete(); } }); }
public Disposable getAllIcons() { return Observable.create(new ObservableOnSubscribe<IconBean>() { @Override public void subscribe(@NonNull ObservableEmitter<IconBean> e) throws Exception { XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable); while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) { if (xml.getEventType() == XmlPullParser.START_TAG) { if (xml.getName().startsWith("item")) { IconBean bean = new IconBean(); String iconName = xml.getAttributeValue(null, "drawable"); bean.id = mView.getResources().getIdentifier( iconName, "drawable", BuildConfig.APPLICATION_ID); bean.name = iconName; e.onNext(bean); } } xml.next(); } e.onComplete(); } }).toList().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<IconBean>>() { @Override public void accept(List<IconBean> list) throws Exception { mView.onLoadData(list); } }); }
private void map() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "This is result " + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName()); info += s + "\n"; tv.setText(info); } }); }
public void test() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer); } }); }
private void doRxJavaWork() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (;;) { // 无限循环发送事件 emitter.onNext(Integer.MAX_VALUE); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } }); }
public Observable<String> getObservable() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Today's news update"); e.onNext("Today's topic is Study"); e.onComplete(); } }); /* 下面两个方法作用类似,just 的内部调用的就是 fromArray */ // return Observable.just("Topic 1", "Heat 1", "News"); // return Observable.fromArray("Topic 1", "Heat 1", "News"); /* 只能发送一个数据 */ /*return Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return "Topic is Study"; } });*/ }
public void requestLoadingList() { Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() { @Override public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception { mModel = ConfigModel.getInstance(configView.getContext()); e.onNext(mModel.getConfigList()); mModel.setConfigCallback(ConfigPresenter.this); } }) .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<ConfigBean>>() { @Override public void accept(List<ConfigBean> list) throws Exception { configView.displayConfigList(list); } }); }
public static Observable<View> with(final View view) { return Observable.create(new ObservableOnSubscribe<View>() { @Override public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception { new Handler(Looper.getMainLooper()).post(new Runnable() { @Override public void run() { view.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View value) { e.onNext(value); } }); } }); } }); }
/** * 按一级分类查找专辑 **/ private Observable<List<Album>> getAlbumByCate(String cateId, int calc_dimension) { final Map<String, String> params = new HashMap<>(); params.put(DTransferConstants.CATEGORY_ID, cateId); params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension)); return Observable.create(new ObservableOnSubscribe<List<Album>>() { @Override public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception { CommonRequest.getAlbumList(params, new IDataCallBack<AlbumList>() { @Override public void onSuccess(AlbumList albumList) { //onNext的参数不允许为null e.onNext(albumList.getAlbums()); e.onComplete(); } @Override public void onError(int i, String s) { e.onError(new Throwable(i + " " + s)); } }); } }).subscribeOn(Schedulers.io()); }
/** * 按关键词查找专辑 **/ private Observable<List<Album>> getAlbumByKeyWord(String keyword, String cateId, int calc_dimension) { final Map<String, String> params = new HashMap<>(); params.put(DTransferConstants.SEARCH_KEY, keyword); params.put(DTransferConstants.CATEGORY_ID, cateId); params.put(DTransferConstants.CALC_DIMENSION, String.valueOf(calc_dimension)); return Observable.create(new ObservableOnSubscribe<List<Album>>() { @Override public void subscribe(final ObservableEmitter<List<Album>> e) throws Exception { CommonRequest.getSearchedAlbums(params, new IDataCallBack<SearchAlbumList>() { @Override public void onSuccess(SearchAlbumList searchAlbumList) { e.onNext(searchAlbumList.getAlbums()); e.onComplete(); } @Override public void onError(int i, String s) { e.onError(new Throwable(i + " " + s)); } }); } }) .subscribeOn(Schedulers.io()); }
/** * Returns an observable on the progress of loading stock build orders into the local SQLite DB. * Should be scheduled on a worker thread. * * @param c context * @param forceLoad if false, builds are only copied if an upgrade is required. If true, * standard builds are always copied. * @return observable on load progress (percentage) */ public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception { try { if (!emitter.isDisposed()) { loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() { @Override public void onProgressUpdate(int percent) { if (!emitter.isDisposed()) { emitter.onNext(percent); } } }); emitter.onComplete(); } } catch (Exception e) { emitter.onError(e); } } }); }
/** * Observes Airplane Mode state of the device with BroadcastReceiver. * RxJava2 Observable emits true if the airplane mode turns on and false otherwise. * * @param context of the Application or Activity * @return RxJava2 Observable with Boolean value indicating state of the airplane mode */ public Observable<Boolean> observe(final Context context) { checkContextIsNotNull(context); final IntentFilter filter = createIntentFilter(); return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter) throws Exception { final BroadcastReceiver receiver = createBroadcastReceiver(emitter); context.registerReceiver(receiver, filter); final Disposable disposable = disposeInUiThread(new Action() { @Override public void run() throws Exception { tryToUnregisterReceiver(receiver, context); } }); emitter.setDisposable(disposable); } }); }
/** * 更新余额、当日收支 **/ public void updateBalance(final int type, final List<TaskCard<Accounting>> taskcards) { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { CountToday(); CountBalance(type, taskcards); e.onNext(0); } }) .subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程 .observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { if (AppConfig.dPreferences.getBoolean(AppConfig.HAS_AMOUNT, false)) mTvBalance.setText("¥" + AssistUtils.formatAmount(balance)); mAdapter.notifyItemChanged(0); } }); }
private Observable<Boolean> checkPlayServicesAvailable() { return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(ObservableEmitter<Boolean> e) throws Exception { final Activity activity = activityReference.get(); if (activity != null) { final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance(); final int status = apiAvailability.isGooglePlayServicesAvailable(activity); if (status != ConnectionResult.SUCCESS) { e.onError(new PlayServicesNotAvailableException()); } else { e.onNext(true); e.onComplete(); } } } }); }
@Test public void test(){ Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() { @Override public void subscribe(ObservableEmitter<Todo> emitter) throws Exception { try { List<Todo> todos = RxJavaUnitTest.this.getTodos(); if (todos!=null){ throw new NullPointerException("todos was null"); } for (Todo todo : todos) { emitter.onNext(todo); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }); TestObserver<Object> testObserver = new TestObserver<>(); todoObservable.subscribeWith(testObserver); testObserver.assertError(NullPointerException.class); }
@Trace(enable = false) private void initData() { Observable.create(new ObservableOnSubscribe<String>() { @Trace @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("111"); e.onNext("222"); e.onNext("333"); } }).subscribe(new Consumer<String>() { @Trace @Override public void accept(@NonNull String str) throws Exception { } }); }
public void openDatabase() { disposable = Observable.create((ObservableOnSubscribe<Realm>) emitter -> { final Realm observableRealm = Realm.getDefaultInstance(); final RealmChangeListener<Realm> listener = realm -> { if(!emitter.isDisposed()) { emitter.onNext(observableRealm); } }; observableRealm.addChangeListener(listener); emitter.setDisposable(Disposables.fromAction(() -> { observableRealm.removeChangeListener(listener); observableRealm.close(); })); emitter.onNext(observableRealm); }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe(); }
public Observable<String> text(final Element element, final String expression) { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { final Elements elements = element.select(expression); if (elements.isEmpty() && exceptionIfNotFound) { observableEmitter.onError(new NotFoundException(expression, element.toString())); } else { if (elements.isEmpty()) { observableEmitter.onNext(""); } else { for (Element e : elements) { observableEmitter.onNext(e.text()); } } observableEmitter.onComplete(); } } }); }
@Override public void test3() { Log.i(TAG, "test3() Create simple demo, onNext() twice"); Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { for (int i = 0; i < 3; i++) { e.onNext(String.valueOf(i)); } } }); for (int time = 0; time < 2; time++) { observable.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }); } }
public static void main(String[] args) { Observable<String> month_observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // TODO Auto-generated method stub try { String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct","Nov", "Dec" }; List<String> months = Arrays.asList(monthArray); for (String month : months) { emitter.onNext(month); } emitter.onComplete(); } catch (Exception e) { // TODO: handle exception emitter.onError(e); } } }); month_observable.subscribe(s -> System.out.println(s)); }
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) { return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception { bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos); bitmap.recycle(); try { fos.flush(); fos.close(); emitter.onNext(true); } catch (IOException e) { e.printStackTrace(); emitter.onError(e); } } }); }
@Override public Observable<List<VideoDaoEntity>> getListFromNet(int start, String userid) { return Observable.create((ObservableOnSubscribe<List<VideoDaoEntity>>) emitter -> { BmobQuery<VideoDaoEntity> query = new BmobQuery<VideoDaoEntity>(); query.addWhereEqualTo("userId", userid); query.setLimit(10); query.order("-updatedAt"); query.setSkip(start); query.findObjects(new FindListener<VideoDaoEntity>() { @Override public void done(List<VideoDaoEntity> list, BmobException e) { List<VideoDaoEntity> infolist = new ArrayList<VideoDaoEntity>(); if (!StringUtils.isEmpty(list)) { for (VideoDaoEntity entity1 : list) { entity1.setVideo(mGson.fromJson(entity1.getBody(), VideoListInfo.Video.VideoData.class)); infolist.add(entity1); } } emitter.onNext(infolist); } }); }); }
private void onActionPackageFullyRemoved(final Intent intent) { Observable.create( new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(ObservableEmitter<Boolean> e) throws Exception { // prefix "package:" String packageName = intent.getData().toString().substring(8); List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class); for (PackageRecord record : list) { if (TextUtils.equals(record.name, packageName)) { record.delete(); } } e.onNext(true); e.onComplete(); } }) .subscribeOn(Schedulers.newThread()) .subscribe(); }
/** * sample操作符每隔指定的时间就从上游中取出一个事件发送给下游. */ private void doSample() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 1000; i++) { //模拟无限循环发送事件 emitter.onNext(i); } } }) .subscribeOn(Schedulers.io()) .sample(1, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } }); }
/** * 查找数据 * * @param cacheId * @return */ public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) { Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() { @Override public void subscribe(ObservableEmitter<Course> e) throws Exception { Util.logMethodThreadId("findDataByIdentifier"); long time = System.currentTimeMillis(); try { Course result = (Course) mIDBEngine.find(cacheId, Course.class); time = System.currentTimeMillis() - time; Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null")); if (result != null) { e.onNext(result); } e.onComplete(); } catch (XDBException d) { e.onError(d); } } }); return courseObservable; }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { FileUtils.getFileFromAsset(MainActivity.this, "1.jpg"); } }).compose(RxUtil.<String>io_main()).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { } }); }
public static void main(String[] args) { // TODO Auto-generated method stub Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // TODO Auto-generated method stub System.out.println("Thread:-"+Thread.currentThread().getName()); emitter.onNext(getNum()); emitter.onComplete(); } }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() { @Override public void accept(Integer value) throws Exception { // TODO Auto-generated method stub System.out.println("Thread for subscription:-"+Thread.currentThread().getName()); System.out.println(value); } }); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end of main:-"+Thread.currentThread().getName()); }
@Override public Observable<Boolean> deleteFromNet(VideoDaoEntity entity) { return Observable.create((ObservableOnSubscribe<Boolean>) emitter -> { entity.delete(new UpdateListener() { @Override public void done(BmobException e) { if (e == null) { emitter.onNext(true); } } }); }); }
public Observable<Integer> getInt(final String key, final int defaultValue) { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(sharedPreferences.getInt(key, defaultValue)); e.onComplete(); } }); }
private Observable<Value> range(Node<Key, Value> x, Key lowerInclusive, Key upperExclusive, int height) { return Observable.create(new ObservableOnSubscribe<Value>() { @Override public void subscribe(ObservableEmitter<Value> emitter) throws Exception { range(x, lowerInclusive, upperExclusive, height, emitter); if (!emitter.isDisposed()) { emitter.onComplete(); } } }); }
private Observable<List<ApiUser>> getObservable() { return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() { @Override public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception { if (!e.isDisposed()) { e.onNext(Utils.getApiUserList()); e.onComplete(); } } }); }
private Observable<ImageInfo> getByteBufferObservable() { return Observable.create(new ObservableOnSubscribe<ImageInfo>() { @Override public void subscribe(ObservableEmitter<ImageInfo> e) throws Exception { imageInfoObservableEmitter = e; Log.d(TAG, "subscribe: " + Process.myTid()); } }); }
private Observable<List<User>> getFootballFansObservable() { return Observable.create(new ObservableOnSubscribe<List<User>>() { @Override public void subscribe(ObservableEmitter<List<User>> e) throws Exception { if (!e.isDisposed()) { e.onNext(Utils.getUserListWhoLovesFootball()); e.onComplete(); } } }); }
@Override public Observable<String> getContent() { return Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("酒店页面"); emitter.onComplete(); } } ); }
@Override public <E> Observable<CollectionChange<RealmResults<E>>> changesetsFrom(Realm realm, final RealmResults<E> results) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmResults<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmResults<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); resultsRefs.get().acquireReference(results); final OrderedRealmCollectionChangeListener<RealmResults<E>> listener = new OrderedRealmCollectionChangeListener<RealmResults<E>>() { @Override public void onChange(RealmResults<E> e, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<RealmResults<E>>(results, changeSet)); } } }; results.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { results.removeChangeListener(listener); observableRealm.close(); resultsRefs.get().releaseReference(results); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(results, null)); } }); }