@Override public Flowable<Integer> deleteAll() { if(shouldThrowError){ shouldThrowError = false; // special case because the StoreService needs to call again getAll() return Flowable.error(new Exception("deleteAll.error")); } return getAll(null, null) .delay(1, TimeUnit.SECONDS) .flatMap(new Function<Optional<List<TestModel>>, Flowable<Integer>>() { @Override public Flowable<Integer> apply(Optional<List<TestModel>> ts) throws Exception { return Flowable.just(ts.get().size()); } }); }
@Override public void initView(View view) { requestBaseInit(getPageTitle()); mAdapter = new AllAppInfoAdapter(this, mAppInfos); mList.setLayoutManager(new LinearLayoutManager(getActivity())); mList.setAdapter(mAdapter); Observable.just("2") .subscribeOn(Schedulers.io()) .map(new Function<String, List<AppUtils.AppInfo>>() { @Override public List<AppUtils.AppInfo> apply(@NonNull String s) throws Exception { return AppUtils.getAppsInfo(); } }) .observeOn(PausedHandlerScheduler.from(getHandler())) .compose(mLifecycleProvider.<List<AppUtils.AppInfo>>bindUntilEvent(FragmentEvent.DESTROY)) .subscribe(new Consumer<List<AppUtils.AppInfo>>() { @Override public void accept(@NonNull List<AppUtils.AppInfo> appInfos) throws Exception { mAdapter.resetData(appInfos); } }); }
void applyOperation(final Function<List<T>, Update<T>> operation) { synchronized (_batchingLock) { if (_batchedOperations != null) { _batchedOperations.add(operation); return; } } applyUpdate(new Function<List<T>, Update<T>>() { @Override public Update<T> apply(List<T> list) throws Exception { list = new ArrayList<>(list); return operation.apply(list); } }); }
public static Observable<List<Client>> getAllClients() { ClientsService service = ServiceGenerator.createService(ClientsService.class); return service.getAllClients(UrlManager.getAllClientsURL()) .flatMap(new Function<JsonElement, Observable<List<Client>>>() { @Override public Observable<List<Client>> apply(JsonElement jsonElement) throws Exception { if(jsonElement != null) { Log.i("Get All Clients" , "JSON: "+jsonElement.toString()); if(jsonElement.isJsonArray()) { List<Client> clients = Client.ClientsListParser.fromJsonArray(jsonElement.getAsJsonArray()); return Observable.just(clients); } else { return Observable.error(new Exception("Expected a JSON Array")); } } else { return Observable.just((List<Client>) new ArrayList<Client>()); } } }).observeOn(AndroidSchedulers.mainThread()); }
@Override public Observable<StorageResult> put(Observable<Entry> entries) { return entries.flatMap((Function<Entry, ObservableSource<StorageResult>>) entry -> { final String insert = "INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)" + " values (?, ?, ?, ?);"; byte[] compressedMvt; try { compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector()); } catch (final IOException ex) { throw Exceptions.propagate(ex); } Observable<Object> params = Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(), flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt); return dataSource.update(insert) .parameterStream(params.toFlowable(BackpressureStrategy.BUFFER)).counts() .map(integer -> new StorageResult(entry)) .onErrorReturn(throwable -> new StorageResult(entry, new Exception(throwable))) .toObservable(); }); }
void registerEvent() { addSubscribe(RxBus.getDefault().toFlowable(NightModeEvent.class) .compose(RxUtil.<NightModeEvent>rxSchedulerHelper()) .map(new Function<NightModeEvent, Boolean>() { @Override public Boolean apply(NightModeEvent nightModeEvent) { return nightModeEvent.getNightMode(); } }) .subscribeWith(new CommonSubscriber<Boolean>(mView, "切换模式失败ヽ(≧Д≦)ノ") { @Override public void onNext(Boolean aBoolean) { mView.useNightMode(aBoolean); } }) ); }
@Test public void onScheduleCrashes() { RxSwingPlugins.setOnSchedule(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable r) throws Exception { throw new IllegalStateException("Failure"); } }); try { RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE); Assert.fail("Should have thrown!"); } catch (IllegalStateException ex) { Assert.assertEquals("Failure", ex.getMessage()); } RxSwingPlugins.reset(); Assert.assertSame(Functions.EMPTY_RUNNABLE, RxSwingPlugins.onSchedule(Functions.EMPTY_RUNNABLE)); }
/** * Informations sur une personne */ private Single<News> news(String idNews, String profile) { final String params = ServiceSecurity.construireParams(false, AllocineService.CODE, idNews, AllocineService.PROFILE, profile ); final String sed = ServiceSecurity.getSED(); final String sig = ServiceSecurity.getSIG(params, sed); return allocineService.news(idNews, profile, sed, sig) .map(new Function<AllocineResponse, News>() { @Override public News apply(AllocineResponse allocineResponse) throws Exception { return null; } }); }
@Override public Flowable<Optional<T>> getById(final int id) { List<Flowable<Optional<T>>> flowables = new ArrayList<>(); Flowable<Optional<T>> flowStorage = dao.getById(id); if(hasSyncedStore()) { flowStorage = flowStorage .flatMap(new Function<Optional<T>, Flowable<Optional<T>>>() { @Override public Flowable<Optional<T>> apply(final Optional<T> item) throws Exception { return syncedStore.insertOrUpdate(item.get()); } }); flowables.add(syncedStore.getById(id)); } flowables.add(flowStorage); return Flowable.concat(flowables); }
@Override public <T> Observable<CacheResult<T>> execute(ApiCache apiCache, String cacheKey, Observable<T> source, Type type) { Observable<CacheResult<T>> cache = loadCache(apiCache, cacheKey, type); cache.onErrorReturn(new Function<Throwable, CacheResult<T>>() { @Override public CacheResult<T> apply(Throwable throwable) throws Exception { return null; } }); Observable<CacheResult<T>> remote = loadRemote(apiCache, cacheKey, source); return Observable.concat(remote, cache).filter(new Predicate<CacheResult<T>>() { @Override public boolean test(CacheResult<T> tCacheResult) throws Exception { return tCacheResult != null && tCacheResult.getCacheData() != null; } }).firstElement().toObservable(); }
@Override protected void startRefresh(HandleBase<MultiHeaderEntity> refreshData) { Flowable.just(refreshData) .onBackpressureDrop() .observeOn(Schedulers.computation()) .map(new Function<HandleBase<MultiHeaderEntity>, DiffUtil.DiffResult>() { @Override public DiffUtil.DiffResult apply(@NonNull HandleBase<MultiHeaderEntity> handleBase) throws Exception { return handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getType(), handleBase.getRefreshType()); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<DiffUtil.DiffResult>() { @Override public void accept(@NonNull DiffUtil.DiffResult diffResult) throws Exception { handleResult(diffResult); } }); }
@Override public Observable<Boolean> seedDatabaseQuestions() { GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation(); final Gson gson = builder.create(); return mDbHelper.isQuestionEmpty() .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() { @Override public ObservableSource<? extends Boolean> apply(Boolean isEmpty) throws Exception { if (isEmpty) { Type type = $Gson$Types .newParameterizedTypeWithOwner(null, List.class, Question.class); List<Question> questionList = gson.fromJson( CommonUtils.loadJSONFromAsset(mContext, AppConstants.SEED_DATABASE_QUESTIONS), type); return saveQuestionList(questionList); } return Observable.just(false); } }); }
@Override protected void initView(View parent) { requestBaseInit(getPageTitle()); Observable.just("123") .subscribeOn(Schedulers.io()) .map(new Function<String, Bitmap>() { @Override public Bitmap apply(@NonNull String s) throws Exception { return BitmapFactory.decodeResource(getResources(), R.drawable.test1); } }) .observeOn(PausedHandlerScheduler.from(getHandler())) .compose(mLifecycleProvider.<Bitmap>bindUntilEvent(FragmentEvent.DESTROY)) .subscribe(new Consumer<Bitmap>() { @Override public void accept(@NonNull Bitmap bitmap) throws Exception { mBitmap = bitmap; mImage.setImageBitmap(mBitmap); } }); }
@Test public void mainThreadCallsThroughToHook() { final AtomicInteger called = new AtomicInteger(); final Scheduler newScheduler = new EmptyScheduler(); RxAndroidPlugins.setMainThreadSchedulerHandler(new Function<Scheduler, Scheduler>() { @Override public Scheduler apply(Scheduler scheduler) { called.getAndIncrement(); return newScheduler; } }); assertSame(newScheduler, AndroidSchedulers.mainThread()); assertEquals(1, called.get()); assertSame(newScheduler, AndroidSchedulers.mainThread()); assertEquals(2, called.get()); }
@Test public void directScheduleOnceUsesHook() { final CountingRunnable newCounter = new CountingRunnable(); final AtomicReference<Runnable> runnableRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { runnableRef.set(runnable); return newCounter; } }); CountingRunnable counter = new CountingRunnable(); scheduler.scheduleDirect(counter); // Verify our runnable was passed to the schedulers hook. assertSame(counter, runnableRef.get()); runUiThreadTasks(); // Verify the scheduled runnable was the one returned from the hook. assertEquals(1, newCounter.get()); assertEquals(0, counter.get()); }
@Test public void directScheduleOnceWithDelayUsesHook() { final CountingRunnable newCounter = new CountingRunnable(); final AtomicReference<Runnable> runnableRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { runnableRef.set(runnable); return newCounter; } }); CountingRunnable counter = new CountingRunnable(); scheduler.scheduleDirect(counter, 1, MINUTES); // Verify our runnable was passed to the schedulers hook. assertSame(counter, runnableRef.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); // Verify the scheduled runnable was the one returned from the hook. assertEquals(1, newCounter.get()); assertEquals(0, counter.get()); }
@Test public void workerScheduleOnceWithDelayUsesHook() { final CountingRunnable newCounter = new CountingRunnable(); final AtomicReference<Runnable> runnableRef = new AtomicReference<>(); RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() { @Override public Runnable apply(Runnable runnable) { runnableRef.set(runnable); return newCounter; } }); Worker worker = scheduler.createWorker(); CountingRunnable counter = new CountingRunnable(); worker.schedule(counter, 1, MINUTES); // Verify our runnable was passed to the schedulers hook. assertSame(counter, runnableRef.get()); idleMainLooper(1, MINUTES); runUiThreadTasks(); // Verify the scheduled runnable was the one returned from the hook. assertEquals(1, newCounter.get()); assertEquals(0, counter.get()); }
@Test public void shouldBeNecessaryToSubscribetoStreamAfterSplitting() { final double[] averages = {0, 0}; Observable<Integer> numbers = Observable.just(22, 22, 99, 22, 101, 22); Function<Integer, Integer> keySelector = integer -> integer % 2; Observable<GroupedObservable<Integer, Integer>> split = numbers.groupBy(keySelector); split.subscribe( group -> { Observable<Double> convertToDouble = group.map(integer -> (double) integer); Function<Double, Double> insertIntoAveragesArray = aDouble -> averages[group.getKey()] = aDouble; convertToDouble.reduce((t1, t2) -> t1+t2).map(insertIntoAveragesArray).subscribe(); } ); assertThat(averages[0]).isEqualTo(0); assertThat(averages[1]).isEqualTo(0); }
@Override public Flowable<Optional<List<TestModel>>> getAll(final List<TestModel> items) { return getAll(null, null).map(new Function<Optional<List<TestModel>>, Optional<List<TestModel>>>() { @Override public Optional<List<TestModel>> apply(Optional<List<TestModel>> fullList) throws Exception { List<TestModel> output = new ArrayList<>(); for(TestModel toFind : items){ for(TestModel tm : fullList.get()){ if(tm.getId() == toFind.getId()){ output.add(tm); } } } return Optional.wrap(output); } }); }
@Override public Observable<List<User>> getUsers(int lastIdQueried, boolean update) { Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class) .getUsers(lastIdQueried, USERS_PER_PAGE); //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存 return mRepositoryManager.obtainCacheService(CommonCache.class) .getUsers(users , new DynamicKey(lastIdQueried) , new EvictDynamicKey(update)) .flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() { @Override public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception { return Observable.just(listReply.getData()); } }); }
@Override public Observable<BoardCoordinate> squareClicks() { ArrayList<Observable<BoardCoordinate>> observables = new ArrayList<>(); for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { final int finalI = i; final int finalJ = j; observables.add( RxView.clicks(imageButtons[i][j]) .map( new Function<Object, BoardCoordinate>() { @Override public BoardCoordinate apply(Object irrelevant) throws Exception { return new BoardCoordinate(finalI, finalJ); } })); } } return Observable.merge(observables); }
/** * check url * * @param url url * @return empty */ private ObservableSource<Object> checkUrl(final String url) { return downloadApi.check(url) .flatMap(new Function<Response<Void>, ObservableSource<Object>>() { @Override public ObservableSource<Object> apply(@NonNull Response<Void> resp) throws Exception { if (!resp.isSuccessful()) { return checkUrlByGet(url); } else { return saveFileInfo(url, resp); } } }) .compose(retry(REQUEST_RETRY_HINT, maxRetryCount)); }
@Override public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception { return throwableObservable .flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception { if (++retryCount <= maxRetries) { // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed). Log.d("get error, it will try after " + retryDelaySecond + " second, retry count " + retryCount); return Observable.timer(retryDelaySecond, TimeUnit.SECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); }
private Single<Location> getLocation(LocationRequest request) { if (!shouldRequestNewLocation()) { return Single.just(mLastLocation); } return mFusedLocation.getLocation(request) .doOnSuccess(new Consumer<Location>() { @Override public void accept(Location location) throws Exception { setLocationCache(location); } }) .timeout(LOCATION_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) .onErrorResumeNext(new Function<Throwable, SingleSource<? extends Location>>() { @Override public SingleSource<? extends Location> apply(Throwable e) throws Exception { if (e instanceof TimeoutException && mLastLocation == null) { return Single.error(new LocationTimeoutException()); } else if (mLastLocation == null) { return Single.error(e); } else { return Single.just(mLastLocation); } } }); }
public static Observable<User> loginUser(final String username , String password) { UserService service = ServiceGenerator.createService(UserService.class); return service.login(UrlManager.loginURL() , new User(username , password)) .flatMap(new Function<JsonElement, Observable<User>>() { @Override public Observable<User> apply(JsonElement jsonElement) throws Exception { if (jsonElement != null) { Log.i("Login User" , "JSON: "+jsonElement.toString()); if(jsonElement.isJsonObject()) { User user = (new Gson()).fromJson(jsonElement.getAsJsonObject() , User.class); PrefUtils.setUsername(user.getUsrFullname()); PrefUtils.setUserEmail(user.getUsrUsername()); return Observable.just(user); } else { return Observable.error(new Exception("Expected a JSON Object")); } } else { return Observable.error(new Exception("Login Failed")); } } }).observeOn(AndroidSchedulers.mainThread()); }
private void doSomeWork() { Observable.range(0, 8).groupBy(new Function<Integer, String>() { @Override public String apply(@NonNull Integer integer) throws Exception { return integer % 2 == 0 ? "偶数" : "奇数"; } }).subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { String key = stringIntegerGroupedObservable.getKey(); Log.i(TAG, "accept: key=" + key); if (key.equals("偶数")) { stringIntegerGroupedObservable.subscribe(getObserver(key)); } else { stringIntegerGroupedObservable.subscribe(getObserver(key)); } } }); }
public Disposable getWhatsNewIcons() { return Observable.fromArray(mView.getResources().getStringArray(R.array.whatsnew)) .map(new Function<String, IconBean>() { @Override public IconBean apply(@NonNull String s) throws Exception { IconBean bean = new IconBean(); bean.id = mView.getResources().getIdentifier(s, "drawable", BuildConfig.APPLICATION_ID); bean.name = s; return bean; } }).toList().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<IconBean>>() { @Override public void accept(List<IconBean> list) throws Exception { mView.onLoadData(list); } }); }
@Test public void onErrorMapWithNoErrorThenReturnEmptyOptional() { String result = Chain.let(0) .guard(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // do nothing } }) .onErrorMap(new Function<Throwable, String>() { @Override public String apply(Throwable throwable) throws Exception { return "!"; } }) .defaultIfEmpty("") .call(); assertEquals("", result); }
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() { return new ObservableTransformer<PayResult, PayResult>() { @Override public ObservableSource<PayResult> apply(Observable<PayResult> upstream) { return upstream.map(new Function<PayResult, PayResult>() { @Override public PayResult apply(PayResult payResult) throws Exception { if (!payResult.isSucceed()) { throw new PayFailedException(payResult.getErrInfo()); } return payResult; } }); } }; }
private static <T> Function<HttpResponseResult<T>, ObservableSource<T>> flatMap() { return new Function<HttpResponseResult<T>, ObservableSource<T>>() { @Override public ObservableSource<T> apply(@NonNull final HttpResponseResult<T> tHttpResponseResult) throws Exception { return new Observable<T>() { @Override protected void subscribeActual(Observer<? super T> observer) { if (tHttpResponseResult.isSuccess()) { observer.onNext(tHttpResponseResult.getResult()); observer.onComplete(); } else { observer.onError(new HttpResponseException(tHttpResponseResult.getMsg(), tHttpResponseResult.getState())); } } }; } }; }
<T> Observable<CacheResult<T>> loadCache(final RxCache rxCache, Type type, final String key, final long time, final boolean needEmpty) { Observable<CacheResult<T>> observable = rxCache.<T>load(type, key, time).flatMap(new Function<T, ObservableSource<CacheResult<T>>>() { @Override public ObservableSource<CacheResult<T>> apply(@NonNull T t) throws Exception { if (t == null) { return Observable.error(new NullPointerException("Not find the cache!")); } return Observable.just(new CacheResult<T>(true, t)); } }); if (needEmpty) { observable = observable .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() { @Override public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception { return Observable.empty(); } }); } return observable; }
private void start() { Disposable disposable = Observable.interval(1, TimeUnit.SECONDS) .take(1) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Long>>() { @Override public ObservableSource<? extends Long> apply(Throwable throwable) throws Exception { return null; } }) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(MainActivity.TAG, "accept: " + aLong); startActivity(new Intent(SplashActivity.this, MainActivity.class)); finish(); } }); dLists.add(disposable); }
@Test public void getMinSensorDelay_forAllNonIMUSensor_returnsInMs() throws Exception { HashSet<SensorType> nonIMUSensors = new HashSet<>(); nonIMUSensors.addAll(Arrays.asList(SensorType.gpsValues())); nonIMUSensors.addAll(Arrays.asList(SensorType.wifiValues())); nonIMUSensors.addAll(Arrays.asList(SensorType.bluetoothValues())); Long count = Observable.fromIterable(nonIMUSensors) .map(new Function<SensorType, Long>() { @Override public Long apply(SensorType sensorType) throws Exception { return basicSensorConfig.getMinSensorDelay(sensorType); } }) .filter(new Predicate<Long>() { @Override public boolean test(Long delay) throws Exception { return delay.equals(MIN_DELAY_MS); } }) .count() .blockingGet(); assertThat(count.intValue(), equalTo(nonIMUSensors.size())); }
private static void debounceTest(int i) { Observable.just(i) .debounce(1000, TimeUnit.MILLISECONDS) /*以最近请求的数据为准*/ .switchMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); }
@Override public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String cacheKey, long cacheTime, Observable<T> source, Type type) { return source.map(new Function<T, CacheResult<T>>() { @Override public CacheResult<T> apply(@NonNull T t) throws Exception { return new CacheResult<T>(false, t); } }); }
@Override public Flowable<Optional<List<Post>>> getAll(Filter filter, final SortingMode sortingMode) { // this IF case is here only to demonstrate the usage of filtering and sorting mode in the UI // this logic should be on the server side and not here ! // !!!! The filter and the sort are hardcoded here (to match presenter choices). if(sortingMode != null && filter != null){ final int userIdAllowed = (int) filter.entrySet().iterator().next().getValue().value; // special return for demo return wrapOptional(apiService.getPosts() .flatMapIterable(new Function<List<Post>, Iterable<Post>>() { @Override public Iterable<Post> apply(List<Post> posts) throws Exception { Collections.sort(posts, new Comparator<Post>() { @Override public int compare(Post p0, Post p1) { return p0.userId - p1.userId; // hardcoded ordering by userId } }); return posts; } }) .filter(new Predicate<Post>() { @Override public boolean test(Post post) throws Exception { return post.userId == userIdAllowed; } }) .toList() .toFlowable() ); } // you can wrap the retrofit response directly in a // Optional object by default for more convenience return wrapOptional(apiService.getPosts()); }
private <T> Flowable<T> forFlowable(Flowable<T> source, BackpressureStrategy backpressureStrategy) { return Flowable.using(this::makeDialog, new Function<ProgressDialog, Publisher<? extends T>>() { @Override public Publisher<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception { return Flowable.create(emitter -> { if (builder.cancelable) { dialog.setOnCancelListener(dialogInterface -> emitter.onComplete()); } dialog.setOnDismissListener(dialogInterface -> emitter.onComplete()); source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete); }, backpressureStrategy); } }, Dialog::dismiss); }
private void loadData(){ Observable .create(new ObservableOnSubscribe<List<CollectItem>>() { @Override public void subscribe(ObservableEmitter<List<CollectItem>> e) throws Exception { mItemList.clear(); e.onNext(DataSupport.findAll(CollectItem.class)); } }) .map(new Function<List<CollectItem>, Boolean>() { @Override public Boolean apply(List<CollectItem> items) throws Exception { return items != null && items.size() > 0 && mItemList.addAll(items); } }) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { addDisposable(disposable); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { showEmptyView(!aBoolean); mAdapter.notifyDataSetChanged(); } }); }
private void doSomeWork() { getObservable() .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just(s).delay(500, TimeUnit.MILLISECONDS); } }) .timestamp() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }