@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) { final String service = Context.CONNECTIVITY_SERVICE; final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service); return Observable.create(new ObservableOnSubscribe<Connectivity>() { @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception { networkCallback = createNetworkCallback(subscriber, context); final NetworkRequest networkRequest = new NetworkRequest.Builder().build(); manager.registerNetworkCallback(networkRequest, networkCallback); } }).doOnDispose(new Action() { @Override public void run() { tryToUnregisterCallback(manager); } }).startWith(Connectivity.create(context)).distinctUntilChanged(); }
private Observable<Integer> getObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // send events with simulated time wait emitter.onNext(1); // skip Thread.sleep(400); emitter.onNext(2); // deliver Thread.sleep(505); emitter.onNext(3); // skip Thread.sleep(100); emitter.onNext(4); // deliver Thread.sleep(605); emitter.onNext(5); // deliver Thread.sleep(510); emitter.onComplete(); } }); }
private Observable<Integer> getObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // send events with simulated time wait Thread.sleep(0); emitter.onNext(1); // skip emitter.onNext(2); // deliver Thread.sleep(505); emitter.onNext(3); // skip Thread.sleep(99); emitter.onNext(4); // skip Thread.sleep(100); emitter.onNext(5); // skip emitter.onNext(6); // deliver Thread.sleep(305); emitter.onNext(7); // deliver Thread.sleep(510); emitter.onComplete(); } }); }
/** * Observes Airplane Mode state of the device with BroadcastReceiver. * RxJava2 Observable emits true if the airplane mode turns on and false otherwise. * * @param context of the Application or Activity * @return RxJava2 Observable with Boolean value indicating state of the airplane mode */ public Observable<Boolean> observe(final Context context) { checkContextIsNotNull(context); final IntentFilter filter = createIntentFilter(); return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(@NonNull final ObservableEmitter<Boolean> emitter) throws Exception { final BroadcastReceiver receiver = createBroadcastReceiver(emitter); context.registerReceiver(receiver, filter); final Disposable disposable = disposeInUiThread(new Action() { @Override public void run() throws Exception { tryToUnregisterReceiver(receiver, context); } }); emitter.setDisposable(disposable); } }); }
private SimpleTarget<Bitmap> getInto(String iconUrl, final ObservableEmitter<Bitmap> emitter) { return requestManager.load(Uri.parse(iconUrl)).asBitmap().into(new SimpleTarget<Bitmap>() { @Override public void onResourceReady(Bitmap resource, GlideAnimation<? super Bitmap> glideAnimation) { emitter.onNext(resource); emitter.onComplete(); } @Override public void onLoadFailed(Exception e, Drawable errorDrawable) { super.onLoadFailed(e, errorDrawable); emitter.onError(e); } }); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { FileUtils.getFileFromAsset(MainActivity.this, "1.jpg"); } }).compose(RxUtil.<String>io_main()).subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { } }); }
@OnClick({R.id.tv_cancel, R.id.tv_ok}) public void onClick(View view) { switch (view.getId()) { case R.id.tv_cancel: onBackPressedSupport(); break; case R.id.tv_ok: Observable.create(new ObservableOnSubscribe<Uri>() { @Override public void subscribe(ObservableEmitter<Uri> e) throws Exception { e.onNext(generateUri()); e.onComplete(); } }).compose(RxHelper.<Uri>rxSchedulerHelper()) .subscribe(new Consumer<Uri>() { @Override public void accept(Uri uri) throws Exception { RxEventHeadBean rxEventHeadBean = new RxEventHeadBean(uri); RxBus.get().send(RX_BUS_CODE_HEAD_IMAGE_URI, rxEventHeadBean); onBackPressedSupport(); } }); break; } }
@Override public boolean saveData(final Object obj) { if (null == obj) { return false; } Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext(new Gson().toJson(obj)); e.onComplete(); } }); RouterWare.observerables.put(routerRequest.getUri().toString(), observable); return true; }
/** * Using the returned Observable, you can be notified about data changes. * Once a transaction is committed, you will get info on classes with changed Objects. */ public static <T> Observable<Class> observable(final BoxStore boxStore) { return Observable.create(new ObservableOnSubscribe<Class>() { @Override public void subscribe(final ObservableEmitter<Class> emitter) throws Exception { final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() { @Override public void onData(Class data) { if (!emitter.isDisposed()) { emitter.onNext(data); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); } }); }
private void map() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "This is result " + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName()); info += s + "\n"; tv.setText(info); } }); }
public void getAllRecipes(Observer<ArrayList<Recipe>> observer){ Observable.create(new ObservableOnSubscribe<ArrayList<Recipe>>() { @Override public void subscribe(ObservableEmitter<ArrayList<Recipe>> e) throws Exception { Cursor cursor = mContext.getContentResolver() .query(RecipesContract.RecipeEntry.CONTENT_URI, null, null, null, RecipesContract.RecipeEntry.COLUMN_NAME + " DESC"); ArrayList<Recipe> recipes; if(cursor != null){ recipes = buildRecipesFromCursor(cursor); cursor.close(); e.onNext(recipes); } else { e.onError(new NullPointerException("Recipe cursor is null.")); } e.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); }
public void requestLoadingList() { Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() { @Override public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception { mModel = ConfigModel.getInstance(configView.getContext()); e.onNext(mModel.getConfigList()); mModel.setConfigCallback(ConfigPresenter.this); } }) .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<ConfigBean>>() { @Override public void accept(List<ConfigBean> list) throws Exception { configView.displayConfigList(list); } }); }
private void doRxJavaWork() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (;;) { // 无限循环发送事件 emitter.onNext(Integer.MAX_VALUE); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } }); }
public Observable<String> getObservable() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Today's news update"); e.onNext("Today's topic is Study"); e.onComplete(); } }); /* 下面两个方法作用类似,just 的内部调用的就是 fromArray */ // return Observable.just("Topic 1", "Heat 1", "News"); // return Observable.fromArray("Topic 1", "Heat 1", "News"); /* 只能发送一个数据 */ /*return Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return "Topic is Study"; } });*/ }
/** * Returns an observable on the progress of loading stock build orders into the local SQLite DB. * Should be scheduled on a worker thread. * * @param c context * @param forceLoad if false, builds are only copied if an upgrade is required. If true, * standard builds are always copied. * @return observable on load progress (percentage) */ public static Observable<Integer> getLoadStandardBuildsIntoDBObservable(final Context c, final boolean forceLoad) { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull final ObservableEmitter<Integer> emitter) throws Exception { try { if (!emitter.isDisposed()) { loadStandardBuildsIntoDB(c, forceLoad, new DbAdapter.ProgressListener() { @Override public void onProgressUpdate(int percent) { if (!emitter.isDisposed()) { emitter.onNext(percent); } } }); emitter.onComplete(); } } catch (Exception e) { emitter.onError(e); } } }); }
public static Observable<View> with(final View view) { return Observable.create(new ObservableOnSubscribe<View>() { @Override public void subscribe(@NonNull final ObservableEmitter<View> e) throws Exception { new Handler(Looper.getMainLooper()).post(new Runnable() { @Override public void run() { view.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View value) { e.onNext(value); } }); } }); } }); }
/** * 主进程onCreate后必须进行的先始化工作 */ public void onMainProgressCreate() { if (!MainProgressInited) { Log.i("LingJu", "init start>>>>>>"); Network = NetUtil.getInstance(this).getCurrentNetType(); init(); //初次定位 BaiduLocateManager.createInstance(this).addObserver(locateListener); BaiduLocateManager.get().start(Network != NetUtil.NetType.NETWORK_TYPE_NONE); Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { initExtra(); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(); } }
@Test public void test(){ Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() { @Override public void subscribe(ObservableEmitter<Todo> emitter) throws Exception { try { List<Todo> todos = RxJavaUnitTest.this.getTodos(); if (todos!=null){ throw new NullPointerException("todos was null"); } for (Todo todo : todos) { emitter.onNext(todo); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }); TestObserver<Object> testObserver = new TestObserver<>(); todoObservable.subscribeWith(testObserver); testObserver.assertError(NullPointerException.class); }
@Trace(enable = false) private void initData() { Observable.create(new ObservableOnSubscribe<String>() { @Trace @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { e.onNext("111"); e.onNext("222"); e.onNext("333"); } }).subscribe(new Consumer<String>() { @Trace @Override public void accept(@NonNull String str) throws Exception { } }); }
public static Observable<String> saveTempBitmap(final FileOutputStream fos, final String filename, final Uri uri) { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { Bitmap bitmap = BitmapFactory.decodeFile(uri.getPath()); int dimension = bitmap.getWidth(); bitmap = Bitmap.createBitmap(bitmap, 0, 0, dimension, dimension); storeFile(fos, bitmap, 100) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { if (aBoolean) { emitter.onNext(filename); } } }); } }); }
public static Observable<Boolean> saveResultPicture(final FileOutputStream fos, final Bitmap bitmap, final int quality) { return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(final ObservableEmitter<Boolean> emitter) throws Exception { storeFile(fos, bitmap, quality) .subscribeOn(Schedulers.io()) .onErrorReturn(new Function<Throwable, Boolean>() { @Override public Boolean apply(Throwable throwable) throws Exception { emitter.onNext(false); return null; } }) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { if (aBoolean) { emitter.onNext(true); } } }); } }); }
static Observable<Boolean> storeFile(final FileOutputStream fos, final Bitmap bitmap, final int quality) { return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception { bitmap.compress(Bitmap.CompressFormat.JPEG, quality, fos); bitmap.recycle(); try { fos.flush(); fos.close(); emitter.onNext(true); } catch (IOException e) { e.printStackTrace(); emitter.onError(e); } } }); }
@Override public void subscribe(final ObservableEmitter<DocumentSnapshot> emitter) throws Exception { final EventListener<DocumentSnapshot> listener = new EventListener<DocumentSnapshot>() { @Override public void onEvent(DocumentSnapshot documentSnapshot, FirebaseFirestoreException e) { if (!emitter.isDisposed()) { if (e == null) { emitter.onNext(documentSnapshot); } else { emitter.onError(e); } } } }; registration = documentReference.addSnapshotListener(listener); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { registration.remove(); } })); }
@Override public void test1() { Log.i(TAG, "test1() Create simple demo, onNext() and onError()"); Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { for (int i = 0; i < 3; i++) { if (i == 1) { e.onError(new IllegalStateException("Just for test")); } e.onNext(String.valueOf(i)); } } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }); }
/** * 查找数据 * * @param cacheId * @return */ public Observable<Course> findDataByIdentifier(@NonNull final String cacheId) { Observable<Course> courseObservable = Observable.create(new ObservableOnSubscribe<Course>() { @Override public void subscribe(ObservableEmitter<Course> e) throws Exception { Util.logMethodThreadId("findDataByIdentifier"); long time = System.currentTimeMillis(); try { Course result = (Course) mIDBEngine.find(cacheId, Course.class); time = System.currentTimeMillis() - time; Util.log("<-- End getCache2Disk(" + time + "):" + "[identifier] = " + cacheId + " [data] = " + (result != null ? result.getData() : "null")); if (result != null) { e.onNext(result); } e.onComplete(); } catch (XDBException d) { e.onError(d); } } }); return courseObservable; }
@Override public Observable<Boolean> recordItemIsRead(final String key) { return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception { emitter.onNext(DBUtils.getDB(AppUtils.getContext()).insertRead(DBConfig .TABLE_GANKIO_CUSTOM, key, ItemState.STATE_IS_READ)); emitter.onComplete(); } }).compose(RxHelper.<Boolean>rxSchedulerHelper()); }
private Observable<List<FilterBean>> getCityObserver(final String id) { Log.d(TAG, "getCityObserver: id=" + id); return Observable.create(new ObservableOnSubscribe<List<FilterBean>>() { @Override public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<List<FilterBean>> e) throws Exception { if (cityDBUtil == null) return; List<FilterBean> data = new ArrayList<>(); cityDBUtil.open(); try { Cursor cursor = cityDBUtil.selectByParentId(id); if (cursor != null && cursor.moveToFirst()) { do { data.add(new FilterBean(cursor.getString(0), cursor.getString(1))); } while (cursor.moveToNext()); cursor.close(); } } catch (Throwable t) { Log.e(TAG, "getCityObserver: t=" + t.getMessage()); } finally { cityDBUtil.close(); } e.onNext(data); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); }
/** * Observable of the view state. The View is ready to receive calls after calling {@link * TiPresenter#attachView(TiView)} and before calling {@link TiPresenter#detachView()}. */ public static Observable<Boolean> isViewReady(final TiPresenter presenter) { return Observable.create(new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(final ObservableEmitter<Boolean> emitter) throws Exception { if (!emitter.isDisposed()) { emitter.onNext(presenter.getState() == TiPresenter.State.VIEW_ATTACHED); } final Removable removable = presenter .addLifecycleObserver(new TiLifecycleObserver() { @Override public void onChange(final TiPresenter.State state, final boolean hasLifecycleMethodBeenCalled) { if (!emitter.isDisposed()) { emitter.onNext(state == TiPresenter.State.VIEW_ATTACHED && hasLifecycleMethodBeenCalled); } } }); emitter.setDisposable(new Disposable() { @Override public void dispose() { removable.remove(); } @Override public boolean isDisposed() { return removable.isRemoved(); } }); } }).distinctUntilChanged(); }
private Observable<List<ApiUser>> getObservable() { return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() { @Override public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception { if (!e.isDisposed()) { e.onNext(Utils.getApiUserList()); e.onComplete(); } } }); }
@Override public Observable<String> getContent() { return Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("酒店页面"); emitter.onComplete(); } } ); }
/** * Read all records from database. * * @return All records. */ public Observable<List<DownloadRecord>> readAllRecords() { return Observable .create(new ObservableOnSubscribe<List<DownloadRecord>>() { @Override public void subscribe(ObservableEmitter<List<DownloadRecord>> emitter) throws Exception { Cursor cursor = null; try { cursor = getReadableDatabase().query(TABLE_NAME, new String[]{COLUMN_ID, COLUMN_URL, COLUMN_SAVE_NAME, COLUMN_SAVE_PATH, COLUMN_DOWNLOAD_SIZE, COLUMN_TOTAL_SIZE, COLUMN_IS_CHUNKED, COLUMN_EXTRA1, COLUMN_EXTRA2, COLUMN_EXTRA3, COLUMN_EXTRA4, COLUMN_EXTRA5, COLUMN_DOWNLOAD_FLAG, COLUMN_DATE, COLUMN_MISSION_ID}, null, null, null, null, null); List<DownloadRecord> result = new ArrayList<>(); cursor.moveToFirst(); if (cursor.getCount() > 0) { do { result.add(read(cursor)); } while (cursor.moveToNext()); } emitter.onNext(result); emitter.onComplete(); } finally { if (cursor != null) { cursor.close(); } } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
private void initDemoRX02() { Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); Log.e(TAG, "emit 1"); emitter.onNext(1); } }); Consumer<Integer> consumer = new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName()); Log.e(TAG, "onNext: " + integer); } }; // observable.subscribe(consumer); /* * 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效. 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次. * */ observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer); }
@Override public <E> Observable<CollectionChange<RealmList<E>>> changesetsFrom(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Observable.create(new ObservableOnSubscribe<CollectionChange<RealmList<E>>>() { @Override public void subscribe(final ObservableEmitter<CollectionChange<RealmList<E>>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final OrderedRealmCollectionChangeListener<RealmList<E>> listener = new OrderedRealmCollectionChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results, OrderedCollectionChangeSet changeSet) { if (!emitter.isDisposed()) { emitter.onNext(new CollectionChange<>(results, changeSet)); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(new CollectionChange<>(list, null)); } }); }
public Observable<Integer> getInt(final String key, final int defaultValue) { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(sharedPreferences.getInt(key, defaultValue)); e.onComplete(); } }); }
private void loadData(){ Observable .create(new ObservableOnSubscribe<List<CollectItem>>() { @Override public void subscribe(ObservableEmitter<List<CollectItem>> e) throws Exception { mItemList.clear(); e.onNext(DataSupport.findAll(CollectItem.class)); } }) .map(new Function<List<CollectItem>, Boolean>() { @Override public Boolean apply(List<CollectItem> items) throws Exception { return items != null && items.size() > 0 && mItemList.addAll(items); } }) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { addDisposable(disposable); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { showEmptyView(!aBoolean); mAdapter.notifyDataSetChanged(); } }); }
public static void umountAll(final Context context, Consumer<Boolean> consumer) { if (consumer == null) { consumer = new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { // DO NOTHING } }; } final String foregroundPackageName = getForegroundAppPackageName(context); final List<String> foregroundServiceList = getForegroundServiceList(context); Observable.create( new ObservableOnSubscribe<Boolean>() { @Override public void subscribe(ObservableEmitter<Boolean> e) throws Exception { List<PackageRecord> list = PackageRecord.listAll(PackageRecord.class); for (PackageRecord record : list) { if (!TextUtils.equals(record.name, foregroundPackageName) && !foregroundServiceList.contains(record.name)) { setApplicationMount(context, record.name, false); } } e.onNext(true); e.onComplete(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer); }
@Override public Observable<List<MonumentEntity>> monumentListEntity() { return Observable.create(new ObservableOnSubscribe<List<MonumentEntity>>() { @Override public void subscribe(ObservableEmitter<List<MonumentEntity>> e) throws Exception { List<MonumentEntity> listPlaces = getAllPlaces(); if (listPlaces != null) { e.onNext(listPlaces); e.onComplete(); } else { e.onError(new Throwable("Error getting data")); } } }); }
public Observable<Long> getLong(final String key, final long defaultValue) { return Observable.create(new ObservableOnSubscribe<Long>() { @Override public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception { e.onNext(sharedPreferences.getLong(key, defaultValue)); e.onComplete(); } }); }