public Observable<RxBeaconRange> beaconsInRegion() { return startup() .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() { @Override public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception { return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() { @Override public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception { beaconManager.addRangeNotifier(new RangeNotifier() { @Override public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) { objectObservableEmitter.onNext(new RxBeaconRange(collection, region)); } }); beaconManager.startRangingBeaconsInRegion(getRegion()); } }); } }); }
@Override public ObservableSource<T> apply(@NonNull Observable<BaseHttpResult<T>> upstream) { return upstream.map(new Function<BaseHttpResult<T>, T>() { @Override public T apply(@NonNull BaseHttpResult<T> tBaseHttpResult) throws Exception { if (tBaseHttpResult == null) { throw new ServerException(ErrorType.EMPTY_BEAN, "解析对象为空"); } LogUtils.e(TAG, tBaseHttpResult.toString()); if (tBaseHttpResult.getStatus() != ErrorType.SUCCESS) { throw new ServerException(tBaseHttpResult.getStatus(), tBaseHttpResult.getMessage()); } return tBaseHttpResult.getData(); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends T>>() { @Override public ObservableSource<? extends T> apply(@NonNull Throwable throwable) throws Exception { // ExceptionEngine 为处理异常的驱动器 throwable throwable.printStackTrace(); return Observable.error(ExceptionEngine.handleException(throwable)); } }); }
@Override public Observable<Boolean> seedDatabaseOptions() { GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation(); final Gson gson = builder.create(); return mDbHelper.isOptionEmpty() .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() { @Override public ObservableSource<? extends Boolean> apply(Boolean isEmpty) throws Exception { if (isEmpty) { Type type = new TypeToken<List<Option>>() { } .getType(); List<Option> optionList = gson.fromJson( CommonUtils.loadJSONFromAsset(mContext, AppConstants.SEED_DATABASE_OPTIONS), type); return saveOptionList(optionList); } return Observable.just(false); } }); }
/** * Gets the download type of file existence. * * @param url file url * @return Download Type */ private Observable<DownloadType> existsType(final String url) { return Observable.just(1) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return recordTable.readLastModify(url); } }) .flatMap(new Function<String, ObservableSource<Object>>() { @Override public ObservableSource<Object> apply(String s) throws Exception { return checkFile(url, s); } }) .flatMap(new Function<Object, ObservableSource<DownloadType>>() { @Override public ObservableSource<DownloadType> apply(Object o) throws Exception { return Observable.just(recordTable.generateFileExistsType(url)); } }); }
/** * check url * * @param url url * @return empty */ private ObservableSource<Object> checkUrl(final String url) { return downloadApi.check(url) .flatMap(new Function<Response<Void>, ObservableSource<Object>>() { @Override public ObservableSource<Object> apply(@NonNull Response<Void> resp) throws Exception { if (!resp.isSuccessful()) { return checkUrlByGet(url); } else { return saveFileInfo(url, resp); } } }) .compose(retry(REQUEST_RETRY_HINT, maxRetryCount)); }
/** * Update and save the packages' status by accessing the Internet. * @return The observable packages whose status are the latest. */ @Override public Observable<List<Package>> refreshPackages() { // It is necessary to build a new realm instance // in a different thread. Realm realm = Realm.getInstance(new RealmConfiguration.Builder() .deleteRealmIfMigrationNeeded() .name(DATABASE_NAME) .build()); return Observable.fromIterable(realm.copyFromRealm(realm.where(Package.class).findAll())) .subscribeOn(Schedulers.io()) .flatMap(new Function<Package, ObservableSource<Package>>() { @Override public ObservableSource<Package> apply(Package aPackage) throws Exception { // A nested request. return refreshPackage(aPackage.getNumber()); } }) .toList() .toObservable(); }
/** * Refresh one package. * Just call the remote data source and it will make everything done. * @param packageId The primary key(The package number). * See more @{@link Package#number}. * @return The observable package. */ @Override public Observable<Package> refreshPackage(@NonNull final String packageId) { return packagesRemoteDataSource .refreshPackage(packageId) .flatMap(new Function<Package, ObservableSource<Package>>() { @Override public ObservableSource<Package> apply(Package p) throws Exception { return Observable .just(p) .doOnNext(new Consumer<Package>() { @Override public void accept(Package aPackage) throws Exception { Package pkg = cachedPackages.get(aPackage.getNumber()); if (pkg != null) { pkg.setData(aPackage.getData()); pkg.setReadable(true); } } }); } }); }
@Override public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception { return throwableObservable .flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception { if (++retryCount <= maxRetries) { // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed). Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond + " second, retry count " + retryCount); return Observable.timer(retryDelaySecond, TimeUnit.SECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); }
private void start() { Disposable disposable = Observable.interval(1, TimeUnit.SECONDS) .take(1) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Long>>() { @Override public ObservableSource<? extends Long> apply(Throwable throwable) throws Exception { return null; } }) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(MainActivity.TAG, "accept: " + aLong); startActivity(new Intent(SplashActivity.this, MainActivity.class)); finish(); } }); dLists.add(disposable); }
private void initView() { subject = PublishSubject.create(); subject.debounce(0, TimeUnit.MILLISECONDS) // .filter(new Predicate<Float>() { // @Override // public boolean test(Float contrast) throws Exception { // return true; // } // }) .distinctUntilChanged() .switchMap(new Function<Float, ObservableSource<ColorMatrixColorFilter>>() { @Override public ObservableSource<ColorMatrixColorFilter> apply(Float value) throws Exception { return postContrast(value); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ColorMatrixColorFilter>() { @Override public void accept(ColorMatrixColorFilter colorMatrixColorFilter) throws Exception { setColorFilter(colorMatrixColorFilter); } }); }
@Override protected void setUp() throws Exception { super.setUp(); contentResolver = getMockContentResolver(); SqlBrite.Logger logger = new SqlBrite.Logger() { @Override public void log(String message) { logs.add(message); } }; ObservableTransformer<Query, Query> queryTransformer = new ObservableTransformer<Query, Query>() { @Override public ObservableSource<Query> apply(Observable<Query> upstream) { return upstream.takeUntil(killSwitch); } }; db = new BriteContentResolver(contentResolver, logger, scheduler, queryTransformer); getProvider().init(getContext().getContentResolver()); }
/** * Given an Observable stream of DeviceAccess, get the unique IdentityProviders in the stream and their latest created date */ private ObservableSource<HashMap<Long, Date>> getLatestActiveDateForIdpId(Observable<DeviceAccess> deviceAccessObservable) { return deviceAccessObservable.collect( () -> new HashMap<Long, Date>(), (latestActiveDateByIdpId, _deviceAccess) -> { Long idpId = _deviceAccess.getIdentityProvider().getId(); Date deviceAccessDate = _deviceAccess.getCreatedDate(); Date latestActiveDate = latestActiveDateByIdpId.get(idpId); if (latestActiveDate == null) { latestActiveDateByIdpId.put(idpId, deviceAccessDate); } else if (latestActiveDate.after(deviceAccessDate)) { latestActiveDateByIdpId.put(idpId, latestActiveDate); } else { latestActiveDateByIdpId.put(idpId, deviceAccessDate); } }) .toObservable(); }
private static void debounceTest(int i) { Observable.just(i) .debounce(1000, TimeUnit.MILLISECONDS) /*以最近请求的数据为准*/ .switchMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); }
public void getGankioData(GankioType type, int count, int page, boolean isProgress) { if (isProgress) { view.showProgress(""); } wrap(gankioRepository.getAllGankioData(type, count, page)).flatMap( new Function<BaseResult<GankioData>, ObservableSource<BaseResult<GankioData>>>() { @Override public ObservableSource<BaseResult<GankioData>> apply( @NonNull BaseResult<GankioData> gankioDataBaseResult) throws Exception { //List<GankioData> results = ; for (GankioData gankioData : gankioDataBaseResult.getResults()) { boolean b = dbRepository.queryBrowseHistory(gankioData.get_id()); gankioData.setBrowseHistory(b); } return Observable.just(gankioDataBaseResult); } }).subscribe(new ViewObserver<GankioData>(view) { @Override protected void onSuccess(List<GankioData> t) { view.display(t); } @Override public void onError(@NonNull Throwable e) { super.onError(e); view.displayError(); } }); }
@Override public void test0() { Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9"); Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { return Observable.just(integer * 2 + "", integer * 3 + ""); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
Observable<TranslationList> getCachedTranslationListObservable(final boolean forceDownload) { return Observable.defer(new Callable<ObservableSource<? extends TranslationList>>() { @Override public ObservableSource<TranslationList> call() throws Exception { boolean isCacheStale = System.currentTimeMillis() - quranSettings.getLastUpdatedTranslationDate() > Constants.MIN_TRANSLATION_REFRESH_TIME; if (forceDownload || isCacheStale) { return Observable.empty(); } try { File cachedFile = getCachedFile(); if (cachedFile.exists()) { Moshi moshi = new Moshi.Builder().build(); JsonAdapter<TranslationList> jsonAdapter = moshi.adapter(TranslationList.class); return Observable.just(jsonAdapter.fromJson(Okio.buffer(Okio.source(cachedFile)))); } } catch (Exception e) { Crashlytics.logException(e); } return Observable.empty(); } }); }
@NonNull public static Single<List<QariDownloadInfo>> shuyookhDownloadObservable( final String basePath, List<QariItem> qariItems) { return Observable.fromIterable(qariItems) .flatMap(new Function<QariItem, ObservableSource<QariDownloadInfo>>() { @Override public ObservableSource<QariDownloadInfo> apply(QariItem item) throws Exception { QariDownloadInfo cached = sCache.get(item); if (cached != null) { return Observable.just(cached); } File baseFile = new File(basePath, item.getPath()); return !baseFile.exists() ? Observable.just(new QariDownloadInfo(item)) : item.isGapless() ? getGaplessSheikhObservable(baseFile, item).toObservable() : getGappedSheikhObservable(baseFile, item).toObservable(); } }) .doOnNext(qariDownloadInfo -> sCache.put(qariDownloadInfo.qariItem, qariDownloadInfo)) .toList() .subscribeOn(Schedulers.io()); }
private static <T> Function<HttpResponseResult<T>, ObservableSource<T>> flatMap() { return new Function<HttpResponseResult<T>, ObservableSource<T>>() { @Override public ObservableSource<T> apply(@NonNull final HttpResponseResult<T> tHttpResponseResult) throws Exception { return new Observable<T>() { @Override protected void subscribeActual(Observer<? super T> observer) { if (tHttpResponseResult.isSuccess()) { observer.onNext(tHttpResponseResult.getResult()); observer.onComplete(); } else { observer.onError(new HttpResponseException(tHttpResponseResult.getMsg(), tHttpResponseResult.getState())); } } }; } }; }
@Override public ObservableSource<R> apply(Observable<R> upstream) { return Observable.combineLatest( Observable.just(new Date()), upstream, Pair::create ) .doOnNext((pair) -> { Date currentTime = new Date(); long diff = currentTime.getTime() - pair.first.getTime(); long diffSeconds = diff / 1000; timerAction.accept(diffSeconds); }) .map(pair -> pair.second); }
public <T> ObservableTransformer<T, T> io_main() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.subscribeOn(Schedulers.newThread()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { //设定是否开启dialog窗 if (showLoading) LoadingDialog.show(); } }).compose(lifecycle)//绑定Lifecycle,解决网络请求内存溢出问题 .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()); } }; }
@Override public Observable<List<User>> getUsers(int lastIdQueried, boolean update) { Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class) .getUsers(lastIdQueried, USERS_PER_PAGE); //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存 return mRepositoryManager.obtainCacheService(CommonCache.class) .getUsers(users , new DynamicKey(lastIdQueried) , new EvictDynamicKey(update)) .flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() { @Override public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception { return Observable.just(listReply.getData()); } }); }
@Override public Observable<Boolean> seedDatabaseQuestions() { GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation(); final Gson gson = builder.create(); return mDbHelper.isQuestionEmpty() .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() { @Override public ObservableSource<? extends Boolean> apply(Boolean isEmpty) throws Exception { if (isEmpty) { Type type = $Gson$Types .newParameterizedTypeWithOwner(null, List.class, Question.class); List<Question> questionList = gson.fromJson( CommonUtils.loadJSONFromAsset(mContext, AppConstants.SEED_DATABASE_QUESTIONS), type); return saveQuestionList(questionList); } return Observable.just(false); } }); }
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() { return new ObservableTransformer<PayResult, PayResult>() { @Override public ObservableSource<PayResult> apply(Observable<PayResult> upstream) { return upstream.map(new Function<PayResult, PayResult>() { @Override public PayResult apply(PayResult payResult) throws Exception { if (!payResult.isSucceed()) { throw new PayFailedException(payResult.getErrInfo()); } return payResult; } }); } }; }
@Override public Observable<Boolean> seedDatabaseOptions() { GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation(); final Gson gson = builder.create(); return mOptionRepository.isOptionEmpty() .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() { @Override public ObservableSource<? extends Boolean> apply(Boolean isEmpty) throws Exception { if (isEmpty) { Type type = new TypeToken<List<Option>>() { }.getType(); List<Option> optionList = gson.fromJson( FileUtils.loadJSONFromAsset( mContext, AppConstants.SEED_DATABASE_OPTIONS), type); return mOptionRepository.saveOptionList(optionList); } return Observable.just(false); } }); }
public static <T> ObservableTransformer<T, T> io_main() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<T> upstream) { return upstream .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed()); } }) .doFinally(new Action() { @Override public void run() throws Exception { HttpLog.i("+++doFinally+++"); } }) .observeOn(AndroidSchedulers.mainThread()); } }; }
private void doSomeWork() { getObservable().flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just("flatMap func-->" + s); } }).buffer(3, 1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); // 3 means, it takes max of three from its start index and create list // 1 means, it jumps one step every time // so the it gives the following list // 1 - flatMap func-->one, flatMap func-->two, flatMap func-->three // 2 - flatMap func-->two, flatMap func-->three, flatMap func-->four // 3 - flatMap func-->three, flatMap func-->four, flatMap func-->five // 4 - flatMap func-->four, flatMap func-->five // 5 - flatMap func-->five }
/** * Map emitted items from the source observable into {@link Permission} objects for each * permission in parameters. * <p> * If one or several permissions have never been requested, invoke the related framework method * to ask the user if he allows the permissions. */ @SuppressWarnings("WeakerAccess") public ObservableTransformer<Object, Permission> ensureEach(final String... permissions) { return new ObservableTransformer<Object, Permission>() { @Override public ObservableSource<Permission> apply(Observable<Object> observable) { return request(observable, permissions); } }; }
private <T> Observable<T> forObservable(Observable<T> source) { return Observable.using(this::makeDialog, new Function<ProgressDialog, ObservableSource<? extends T>>() { @Override public ObservableSource<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception { return Observable.create(emitter -> { if (builder.cancelable) { dialog.setOnCancelListener(dialogInterface -> emitter.onComplete()); } dialog.setOnDismissListener(dialogInterface -> emitter.onComplete()); source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete); }); } }, Dialog::dismiss); }
private void doSomeWork() { getObservable() .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull String s) throws Exception { return Observable.just(s).delay(500, TimeUnit.MILLISECONDS); } }) .timestamp() // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
public Observable<String> brandDeferObservable() { return Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() throws Exception { return Observable.just(brand); } }); }
/** * take Operator Example */ public void take(View view) { getUserListObservable() .flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one @Override public ObservableSource<User> apply(List<User> usersList) throws Exception { return Observable.fromIterable(usersList); // returning user one by one from usersList. } }) .take(4) // it will only emit first 4 users out of all .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(User user) { // // only four user comes here one by one Log.d(TAG, "user : " + user.toString()); } @Override public void onError(Throwable e) { Utils.logError(TAG, e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
public <T> ObservableTransformer<T, T> applyObservableAsync() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
public <T> ObservableTransformer<T, T> applyObservableCompute() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()); } }; }
public <T> ObservableTransformer<T, T> applyObservableMainThread() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable.observeOn(AndroidSchedulers.mainThread()); } }; }
static Observable<String> sampleObservable() { return Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() throws Exception { // Do some long running operation SystemClock.sleep(5000); return Observable.just("one", "two", "three", "four", "five"); } }); }
/************************************ * take operator start ************************************/ public void take(View view) { getUserListObservable() .flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one @Override public ObservableSource<User> apply(List<User> usersList) throws Exception { return Observable.fromIterable(usersList); // returning user one by one from usersList. } }) .take(4) // it will only emit first 4 users out of all .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(User user) { // // only four user comes here one by one Log.d(TAG, "user id : " + user.id); Log.d(TAG, "user firstname : " + user.firstname); Log.d(TAG, "user lastname : " + user.lastname); Log.d(TAG, "isFollowing : " + user.isFollowing); } @Override public void onError(Throwable e) { Utils.logError(TAG, e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }