Java 类io.reactivex.ObservableSource 实例源码

项目:RxBeacon    文件:RxBeacon.java   
public Observable<RxBeaconRange> beaconsInRegion() {
    return startup()
            .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
                @Override
                public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
                        @Override
                        public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
                            beaconManager.addRangeNotifier(new RangeNotifier() {
                                @Override
                                public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
                                    objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
                                }
                            });
                            beaconManager.startRangingBeaconsInRegion(getRegion());
                        }
                    });
                }
            });
}
项目:Bing    文件:ErrorTransformer.java   
@Override
public ObservableSource<T> apply(@NonNull Observable<BaseHttpResult<T>> upstream) {
    return upstream.map(new Function<BaseHttpResult<T>, T>() {
        @Override
        public T apply(@NonNull BaseHttpResult<T> tBaseHttpResult) throws Exception {
            if (tBaseHttpResult == null) {
                throw new ServerException(ErrorType.EMPTY_BEAN, "解析对象为空");
            }
            LogUtils.e(TAG, tBaseHttpResult.toString());
            if (tBaseHttpResult.getStatus() != ErrorType.SUCCESS) {
                throw new ServerException(tBaseHttpResult.getStatus(),
                        tBaseHttpResult.getMessage());
            }
            return tBaseHttpResult.getData();
        }
    }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends T>>() {
        @Override
        public ObservableSource<? extends T> apply(@NonNull Throwable throwable)
                throws Exception {
            // ExceptionEngine 为处理异常的驱动器 throwable
            throwable.printStackTrace();
            return Observable.error(ExceptionEngine.handleException(throwable));
        }
    });

}
项目:GitHub    文件:AppDataManager.java   
@Override
public Observable<Boolean> seedDatabaseOptions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mDbHelper.isOptionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = new TypeToken<List<Option>>() {
                        }
                                .getType();
                        List<Option> optionList = gson.fromJson(
                                CommonUtils.loadJSONFromAsset(mContext,
                                        AppConstants.SEED_DATABASE_OPTIONS),
                                type);

                        return saveOptionList(optionList);
                    }
                    return Observable.just(false);
                }
            });
}
项目:GitHub    文件:DownloadHelper.java   
/**
 * Gets the download type of file existence.
 *
 * @param url file url
 * @return Download Type
 */
private Observable<DownloadType> existsType(final String url) {
    return Observable.just(1)
            .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return recordTable.readLastModify(url);
                }
            })
            .flatMap(new Function<String, ObservableSource<Object>>() {
                @Override
                public ObservableSource<Object> apply(String s) throws Exception {
                    return checkFile(url, s);
                }
            })
            .flatMap(new Function<Object, ObservableSource<DownloadType>>() {
                @Override
                public ObservableSource<DownloadType> apply(Object o)
                        throws Exception {
                    return Observable.just(recordTable.generateFileExistsType(url));
                }
            });
}
项目:GitHub    文件:DownloadHelper.java   
/**
 * check url
 *
 * @param url url
 * @return empty
 */
private ObservableSource<Object> checkUrl(final String url) {
    return downloadApi.check(url)
            .flatMap(new Function<Response<Void>, ObservableSource<Object>>() {
                @Override
                public ObservableSource<Object> apply(@NonNull Response<Void> resp)
                        throws Exception {
                    if (!resp.isSuccessful()) {
                        return checkUrlByGet(url);
                    } else {
                        return saveFileInfo(url, resp);
                    }
                }
            })
            .compose(retry(REQUEST_RETRY_HINT, maxRetryCount));
}
项目:Espresso    文件:PackagesRemoteDataSource.java   
/**
 * Update and save the packages' status by accessing the Internet.
 * @return The observable packages whose status are the latest.
 */
@Override
public Observable<List<Package>> refreshPackages() {
    // It is necessary to build a new realm instance
    // in a different thread.
    Realm realm = Realm.getInstance(new RealmConfiguration.Builder()
            .deleteRealmIfMigrationNeeded()
            .name(DATABASE_NAME)
            .build());

    return Observable.fromIterable(realm.copyFromRealm(realm.where(Package.class).findAll()))
            .subscribeOn(Schedulers.io())
            .flatMap(new Function<Package, ObservableSource<Package>>() {
                @Override
                public ObservableSource<Package> apply(Package aPackage) throws Exception {
                    // A nested request.
                    return refreshPackage(aPackage.getNumber());
                }
            })
            .toList()
            .toObservable();
}
项目:Espresso    文件:PackagesRepository.java   
/**
 * Refresh one package.
 * Just call the remote data source and it will make everything done.
 * @param packageId The primary key(The package number).
 *                  See more @{@link Package#number}.
 * @return The observable package.
 */
@Override
public Observable<Package> refreshPackage(@NonNull final String packageId) {
    return packagesRemoteDataSource
            .refreshPackage(packageId)
            .flatMap(new Function<Package, ObservableSource<Package>>() {
                @Override
                public ObservableSource<Package> apply(Package p) throws Exception {
                    return Observable
                            .just(p)
                            .doOnNext(new Consumer<Package>() {
                                @Override
                                public void accept(Package aPackage) throws Exception {
                                    Package pkg = cachedPackages.get(aPackage.getNumber());
                                    if (pkg != null) {
                                        pkg.setData(aPackage.getData());
                                        pkg.setReadable(true);
                                    }
                                }
                            });
                }
            });
}
项目:GmArchMvvm    文件:RetryWithDelay.java   
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
    return throwableObservable
            .flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                    if (++retryCount <= maxRetries) {
                        // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                        Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond
                                + " second, retry count " + retryCount);
                        return Observable.timer(retryDelaySecond,
                                TimeUnit.SECONDS);
                    }
                    // Max retries hit. Just pass the error along.
                    return Observable.error(throwable);
                }
            });
}
项目:Rxjava2.0Demo    文件:SplashActivity.java   
private void start() {
    Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
            .take(1)
            .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Long>>() {
                @Override
                public ObservableSource<? extends Long> apply(Throwable throwable) throws Exception {
                    return null;
                }
            })
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.e(MainActivity.TAG, "accept: " + aLong);
                    startActivity(new Intent(SplashActivity.this, MainActivity.class));
                    finish();
                }
            });
    dLists.add(disposable);

}
项目:EditPhoto    文件:ContrastView.java   
private void initView() {
        subject = PublishSubject.create();
        subject.debounce(0, TimeUnit.MILLISECONDS)
//                .filter(new Predicate<Float>() {
//                    @Override
//                    public boolean test(Float contrast) throws Exception {
//                        return true;
//                    }
//                })
                .distinctUntilChanged()
                .switchMap(new Function<Float, ObservableSource<ColorMatrixColorFilter>>() {
                    @Override
                    public ObservableSource<ColorMatrixColorFilter> apply(Float value) throws Exception {
                        return postContrast(value);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ColorMatrixColorFilter>() {
                    @Override
                    public void accept(ColorMatrixColorFilter colorMatrixColorFilter) throws Exception {
                        setColorFilter(colorMatrixColorFilter);
                    }
                });
    }
项目:sqlbrite-sqlcipher    文件:BriteContentResolverTest.java   
@Override protected void setUp() throws Exception {
  super.setUp();
  contentResolver = getMockContentResolver();

  SqlBrite.Logger logger = new SqlBrite.Logger() {
    @Override public void log(String message) {
      logs.add(message);
    }
  };
  ObservableTransformer<Query, Query> queryTransformer =
      new ObservableTransformer<Query, Query>() {
        @Override public ObservableSource<Query> apply(Observable<Query> upstream) {
          return upstream.takeUntil(killSwitch);
        }
      };
  db = new BriteContentResolver(contentResolver, logger, scheduler, queryTransformer);

  getProvider().init(getContext().getContentResolver());
}
项目:wayf-cloud    文件:IdentityProviderUsageFacadeImpl.java   
/**
 * Given an Observable stream of DeviceAccess, get the unique IdentityProviders in the stream and their latest created date
 */
private ObservableSource<HashMap<Long, Date>> getLatestActiveDateForIdpId(Observable<DeviceAccess> deviceAccessObservable) {
    return deviceAccessObservable.collect(
            () -> new HashMap<Long, Date>(),
            (latestActiveDateByIdpId, _deviceAccess) -> {
                Long idpId = _deviceAccess.getIdentityProvider().getId();

                Date deviceAccessDate = _deviceAccess.getCreatedDate();
                Date latestActiveDate = latestActiveDateByIdpId.get(idpId);

                if (latestActiveDate == null) {
                    latestActiveDateByIdpId.put(idpId, deviceAccessDate);
                } else if (latestActiveDate.after(deviceAccessDate)) {
                    latestActiveDateByIdpId.put(idpId, latestActiveDate);
                } else {
                    latestActiveDateByIdpId.put(idpId, deviceAccessDate);
                }

            })
            .toObservable();
}
项目:Android-Code-Demos    文件:OperatorTest.java   
private static void debounceTest(int i) {
    Observable.just(i)
            .debounce(1000, TimeUnit.MILLISECONDS)
            /*以最近请求的数据为准*/
            .switchMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    return Observable.just(String.valueOf(integer));
                }
            }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

}
项目:BrotherWeather    文件:GankioAllPresenter.java   
public void getGankioData(GankioType type, int count, int page, boolean isProgress) {
  if (isProgress) {
    view.showProgress("");
  }
  wrap(gankioRepository.getAllGankioData(type, count, page)).flatMap(
      new Function<BaseResult<GankioData>, ObservableSource<BaseResult<GankioData>>>() {
        @Override public ObservableSource<BaseResult<GankioData>> apply(
            @NonNull BaseResult<GankioData> gankioDataBaseResult) throws Exception {
          //List<GankioData> results = ;
          for (GankioData gankioData : gankioDataBaseResult.getResults()) {
            boolean b = dbRepository.queryBrowseHistory(gankioData.get_id());
            gankioData.setBrowseHistory(b);
          }
          return Observable.just(gankioDataBaseResult);
        }
      }).subscribe(new ViewObserver<GankioData>(view) {
    @Override protected void onSuccess(List<GankioData> t) {
      view.display(t);
    }

    @Override public void onError(@NonNull Throwable e) {
      super.onError(e);
      view.displayError();
    }
  });
}
项目:RxJava4AndroidDemos    文件:FlatMap.java   
@Override
public void test0() {
    Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9");
    Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
            return Observable.just(integer * 2 + "", integer * 3 + "");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "Consumer<String> accept() s: " + s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "Action run() for onComplete()");
        }
    });
}
项目:Quran    文件:TranslationManagerPresenter.java   
Observable<TranslationList> getCachedTranslationListObservable(final boolean forceDownload) {
  return Observable.defer(new Callable<ObservableSource<? extends TranslationList>>() {
    @Override
    public ObservableSource<TranslationList> call() throws Exception {
      boolean isCacheStale = System.currentTimeMillis() -
          quranSettings.getLastUpdatedTranslationDate() > Constants.MIN_TRANSLATION_REFRESH_TIME;
      if (forceDownload || isCacheStale) {
        return Observable.empty();
      }

      try {
        File cachedFile = getCachedFile();
        if (cachedFile.exists()) {
          Moshi moshi = new Moshi.Builder().build();
          JsonAdapter<TranslationList> jsonAdapter = moshi.adapter(TranslationList.class);
          return Observable.just(jsonAdapter.fromJson(Okio.buffer(Okio.source(cachedFile))));
        }
      } catch (Exception e) {
        Crashlytics.logException(e);
      }
      return Observable.empty();
    }
  });
}
项目:Quran    文件:AudioManagerUtils.java   
@NonNull
public static Single<List<QariDownloadInfo>> shuyookhDownloadObservable(
    final String basePath, List<QariItem> qariItems) {
  return Observable.fromIterable(qariItems)
      .flatMap(new Function<QariItem, ObservableSource<QariDownloadInfo>>() {
        @Override
        public ObservableSource<QariDownloadInfo> apply(QariItem item) throws Exception {
          QariDownloadInfo cached = sCache.get(item);
          if (cached != null) {
            return Observable.just(cached);
          }

          File baseFile = new File(basePath, item.getPath());
          return !baseFile.exists() ? Observable.just(new QariDownloadInfo(item)) :
              item.isGapless() ? getGaplessSheikhObservable(baseFile, item).toObservable() :
                  getGappedSheikhObservable(baseFile, item).toObservable();
        }
      })
      .doOnNext(qariDownloadInfo -> sCache.put(qariDownloadInfo.qariItem, qariDownloadInfo))
      .toList()
      .subscribeOn(Schedulers.io());
}
项目:rxjava2_retrofit2    文件:ResultTransformer.java   
private static <T> Function<HttpResponseResult<T>, ObservableSource<T>> flatMap() {
    return new Function<HttpResponseResult<T>, ObservableSource<T>>() {
        @Override
        public ObservableSource<T> apply(@NonNull final HttpResponseResult<T> tHttpResponseResult) throws Exception {
            return new Observable<T>() {
                @Override
                protected void subscribeActual(Observer<? super T> observer) {
                    if (tHttpResponseResult.isSuccess()) {
                        observer.onNext(tHttpResponseResult.getResult());
                        observer.onComplete();
                    } else {
                        observer.onError(new HttpResponseException(tHttpResponseResult.getMsg(), tHttpResponseResult.getState()));
                    }
                }
            };
        }
    };
}
项目:Reactive-Android-Programming    文件:TimingObservableTransformer.java   
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
    return Observable.combineLatest(
            Observable.just(new Date()),
            upstream,
            Pair::create
    )
            .doOnNext((pair) -> {
                Date currentTime = new Date();
                long diff = currentTime.getTime() - pair.first.getTime();
                long diffSeconds = diff / 1000;

                timerAction.accept(diffSeconds);
            })
            .map(pair -> pair.second);
}
项目:JBase    文件:RxSchedulers.java   
public  <T> ObservableTransformer<T, T> io_main() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.subscribeOn(Schedulers.newThread())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            //设定是否开启dialog窗
                            if (showLoading)
                                LoadingDialog.show();
                        }
                    }).compose(lifecycle)//绑定Lifecycle,解决网络请求内存溢出问题
                    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:android-mvp-architecture    文件:AppDataManager.java   
@Override
public Observable<Boolean> seedDatabaseOptions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mDbHelper.isOptionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = new TypeToken<List<Option>>() {
                        }
                                .getType();
                        List<Option> optionList = gson.fromJson(
                                CommonUtils.loadJSONFromAsset(mContext,
                                        AppConstants.SEED_DATABASE_OPTIONS),
                                type);

                        return saveOptionList(optionList);
                    }
                    return Observable.just(false);
                }
            });
}
项目:MVPArmsTest1    文件:UserModel.java   
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
    Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
            .getUsers(lastIdQueried, USERS_PER_PAGE);
    //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
    return mRepositoryManager.obtainCacheService(CommonCache.class)
            .getUsers(users
                    , new DynamicKey(lastIdQueried)
                    , new EvictDynamicKey(update))
            .flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
                @Override
                public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
                    return Observable.just(listReply.getData());
                }
            });
}
项目:android-mvvm-architecture    文件:AppDataManager.java   
@Override
public Observable<Boolean> seedDatabaseQuestions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mDbHelper.isQuestionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = $Gson$Types
                                .newParameterizedTypeWithOwner(null, List.class,
                                        Question.class);
                        List<Question> questionList = gson.fromJson(
                                CommonUtils.loadJSONFromAsset(mContext,
                                        AppConstants.SEED_DATABASE_QUESTIONS),
                                type);

                        return saveQuestionList(questionList);
                    }
                    return Observable.just(false);
                }
            });
}
项目:Aurora    文件:UserModel.java   
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
    Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
            .getUsers(lastIdQueried, USERS_PER_PAGE);
    //使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
    return mRepositoryManager.obtainCacheService(CommonCache.class)
            .getUsers(users
                    , new DynamicKey(lastIdQueried)
                    , new EvictDynamicKey(update))
            .flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
                @Override
                public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
                    return Observable.just(listReply.getData());
                }
            });
}
项目:RxPay    文件:RxPayUtils.java   
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
    return new ObservableTransformer<PayResult, PayResult>() {
        @Override
        public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
            return upstream.map(new Function<PayResult, PayResult>() {
                @Override
                public PayResult apply(PayResult payResult) throws Exception {
                    if (!payResult.isSucceed()) {
                        throw new PayFailedException(payResult.getErrInfo());
                    }
                    return payResult;
                }
            });
        }
    };
}
项目:android-mvp-interactor-architecture    文件:SplashInteractor.java   
@Override
public Observable<Boolean> seedDatabaseOptions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mOptionRepository.isOptionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = new TypeToken<List<Option>>() {
                        }.getType();
                        List<Option> optionList = gson.fromJson(
                                FileUtils.loadJSONFromAsset(
                                        mContext,
                                        AppConstants.SEED_DATABASE_OPTIONS),
                                type);
                        return mOptionRepository.saveOptionList(optionList);
                    }
                    return Observable.just(false);
                }
            });
}
项目:RxEasyHttp    文件:RxUtil.java   
public static <T> ObservableTransformer<T, T> io_main() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
                        }
                    })
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            HttpLog.i("+++doFinally+++");
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:RxJava2-Android-Sample    文件:FlatMapExampleActivity.java   
private void doSomeWork() {

        getObservable().flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull String s) throws Exception {
                return Observable.just("flatMap func-->" + s);
            }
        }).buffer(3, 1)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());

        // 3 means,  it takes max of three from its start index and create list
        // 1 means, it jumps one step every time
        // so the it gives the following list
        // 1 - flatMap func-->one, flatMap func-->two, flatMap func-->three
        // 2 - flatMap func-->two, flatMap func-->three, flatMap func-->four
        // 3 - flatMap func-->three, flatMap func-->four, flatMap func-->five
        // 4 - flatMap func-->four, flatMap func-->five
        // 5 - flatMap func-->five
    }
项目:android-mvvm-architecture    文件:AppDataManager.java   
@Override
public Observable<Boolean> seedDatabaseOptions() {

    GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
    final Gson gson = builder.create();

    return mDbHelper.isOptionEmpty()
            .concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
                @Override
                public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
                        throws Exception {
                    if (isEmpty) {
                        Type type = new TypeToken<List<Option>>() {
                        }
                                .getType();
                        List<Option> optionList = gson.fromJson(
                                CommonUtils.loadJSONFromAsset(mContext,
                                        AppConstants.SEED_DATABASE_OPTIONS),
                                type);

                        return saveOptionList(optionList);
                    }
                    return Observable.just(false);
                }
            });
}
项目:Reactive-Android-Programming    文件:TimingObservableTransformer.java   
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
    return Observable.combineLatest(
            Observable.just(new Date()),
            upstream,
            Pair::create
    )
            .doOnNext((pair) -> {
                Date currentTime = new Date();
                long diff = currentTime.getTime() - pair.first.getTime();
                long diffSeconds = diff / 1000;

                timerAction.accept(diffSeconds);
            })
            .map(pair -> pair.second);
}
项目:KTools    文件:RxPermissions.java   
/**
 * Map emitted items from the source observable into {@link Permission} objects for each
 * permission in parameters.
 * <p>
 * If one or several permissions have never been requested, invoke the related framework method
 * to ask the user if he allows the permissions.
 */
@SuppressWarnings("WeakerAccess")
public ObservableTransformer<Object, Permission> ensureEach(final String... permissions) {
    return new ObservableTransformer<Object, Permission>() {
        @Override
        public ObservableSource<Permission> apply(Observable<Object> observable) {
            return request(observable, permissions);
        }

    };
}
项目:RxProgress    文件:RxProgress.java   
private <T> Observable<T> forObservable(Observable<T> source) {
    return Observable.using(this::makeDialog,
            new Function<ProgressDialog, ObservableSource<? extends T>>() {
                @Override
                public ObservableSource<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception {
                    return Observable.create(emitter -> {
                        if (builder.cancelable) {
                            dialog.setOnCancelListener(dialogInterface -> emitter.onComplete());
                        }
                        dialog.setOnDismissListener(dialogInterface -> emitter.onComplete());
                        source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
                    });
                }
            }, Dialog::dismiss);
}
项目:RxJava2-Android-Sample    文件:TimestampExampleActivity.java   
private void doSomeWork() {
    getObservable()
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(@NonNull String s) throws Exception {
                    return Observable.just(s).delay(500, TimeUnit.MILLISECONDS);
                }
            })
            .timestamp()
            // Run on a background thread
            .subscribeOn(Schedulers.io())
            // Be notified on the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(getObserver());
}
项目:GitHub    文件:Car.java   
public Observable<String> brandDeferObservable() {
    return Observable.defer(new Callable<ObservableSource<? extends String>>() {
        @Override
        public ObservableSource<? extends String> call() throws Exception {
            return Observable.just(brand);
        }
    });
}
项目:GitHub    文件:NetworkingActivity.java   
/**
 * take Operator Example
 */

public void take(View view) {
    getUserListObservable()
            .flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one
                @Override
                public ObservableSource<User> apply(List<User> usersList) throws Exception {
                    return Observable.fromIterable(usersList); // returning user one by one from usersList.
                }
            })
            .take(4) // it will only emit first 4 users out of all
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<User>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(User user) {
                    // // only four user comes here one by one
                    Log.d(TAG, "user : " + user.toString());
                }

                @Override
                public void onError(Throwable e) {
                    Utils.logError(TAG, e);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
项目:GitHub    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableAsync() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:RxJava2-Android-Sample    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableCompute() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableMainThread() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:GitHub    文件:MainActivity.java   
static Observable<String> sampleObservable() {
    return Observable.defer(new Callable<ObservableSource<? extends String>>() {
      @Override public ObservableSource<? extends String> call() throws Exception {
            // Do some long running operation
            SystemClock.sleep(5000);
            return Observable.just("one", "two", "three", "four", "five");
        }
    });
}
项目:GitHub    文件:Rx2OperatorExampleActivity.java   
/************************************
 * take operator start
 ************************************/

public void take(View view) {
    getUserListObservable()
            .flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one
                @Override
                public ObservableSource<User> apply(List<User> usersList) throws Exception {
                    return Observable.fromIterable(usersList); // returning user one by one from usersList.
                }
            })
            .take(4) // it will only emit first 4 users out of all
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<User>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(User user) {
                    // // only four user comes here one by one
                    Log.d(TAG, "user id : " + user.id);
                    Log.d(TAG, "user firstname : " + user.firstname);
                    Log.d(TAG, "user lastname : " + user.lastname);
                    Log.d(TAG, "isFollowing : " + user.isFollowing);
                }

                @Override
                public void onError(Throwable e) {
                    Utils.logError(TAG, e);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}