Java 类io.reactivex.ObservableSource 实例源码
项目:RxBeacon
文件:RxBeacon.java
public Observable<RxBeaconRange> beaconsInRegion() {
return startup()
.flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
@Override
public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
beaconManager.addRangeNotifier(new RangeNotifier() {
@Override
public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
}
});
beaconManager.startRangingBeaconsInRegion(getRegion());
}
});
}
});
}
项目:Bing
文件:ErrorTransformer.java
@Override
public ObservableSource<T> apply(@NonNull Observable<BaseHttpResult<T>> upstream) {
return upstream.map(new Function<BaseHttpResult<T>, T>() {
@Override
public T apply(@NonNull BaseHttpResult<T> tBaseHttpResult) throws Exception {
if (tBaseHttpResult == null) {
throw new ServerException(ErrorType.EMPTY_BEAN, "解析对象为空");
}
LogUtils.e(TAG, tBaseHttpResult.toString());
if (tBaseHttpResult.getStatus() != ErrorType.SUCCESS) {
throw new ServerException(tBaseHttpResult.getStatus(),
tBaseHttpResult.getMessage());
}
return tBaseHttpResult.getData();
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends T>>() {
@Override
public ObservableSource<? extends T> apply(@NonNull Throwable throwable)
throws Exception {
// ExceptionEngine 为处理异常的驱动器 throwable
throwable.printStackTrace();
return Observable.error(ExceptionEngine.handleException(throwable));
}
});
}
项目:GitHub
文件:AppDataManager.java
@Override
public Observable<Boolean> seedDatabaseOptions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mDbHelper.isOptionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = new TypeToken<List<Option>>() {
}
.getType();
List<Option> optionList = gson.fromJson(
CommonUtils.loadJSONFromAsset(mContext,
AppConstants.SEED_DATABASE_OPTIONS),
type);
return saveOptionList(optionList);
}
return Observable.just(false);
}
});
}
项目:GitHub
文件:DownloadHelper.java
/**
* Gets the download type of file existence.
*
* @param url file url
* @return Download Type
*/
private Observable<DownloadType> existsType(final String url) {
return Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return recordTable.readLastModify(url);
}
})
.flatMap(new Function<String, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(String s) throws Exception {
return checkFile(url, s);
}
})
.flatMap(new Function<Object, ObservableSource<DownloadType>>() {
@Override
public ObservableSource<DownloadType> apply(Object o)
throws Exception {
return Observable.just(recordTable.generateFileExistsType(url));
}
});
}
项目:GitHub
文件:DownloadHelper.java
/**
* check url
*
* @param url url
* @return empty
*/
private ObservableSource<Object> checkUrl(final String url) {
return downloadApi.check(url)
.flatMap(new Function<Response<Void>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(@NonNull Response<Void> resp)
throws Exception {
if (!resp.isSuccessful()) {
return checkUrlByGet(url);
} else {
return saveFileInfo(url, resp);
}
}
})
.compose(retry(REQUEST_RETRY_HINT, maxRetryCount));
}
项目:Espresso
文件:PackagesRemoteDataSource.java
/**
* Update and save the packages' status by accessing the Internet.
* @return The observable packages whose status are the latest.
*/
@Override
public Observable<List<Package>> refreshPackages() {
// It is necessary to build a new realm instance
// in a different thread.
Realm realm = Realm.getInstance(new RealmConfiguration.Builder()
.deleteRealmIfMigrationNeeded()
.name(DATABASE_NAME)
.build());
return Observable.fromIterable(realm.copyFromRealm(realm.where(Package.class).findAll()))
.subscribeOn(Schedulers.io())
.flatMap(new Function<Package, ObservableSource<Package>>() {
@Override
public ObservableSource<Package> apply(Package aPackage) throws Exception {
// A nested request.
return refreshPackage(aPackage.getNumber());
}
})
.toList()
.toObservable();
}
项目:Espresso
文件:PackagesRepository.java
/**
* Refresh one package.
* Just call the remote data source and it will make everything done.
* @param packageId The primary key(The package number).
* See more @{@link Package#number}.
* @return The observable package.
*/
@Override
public Observable<Package> refreshPackage(@NonNull final String packageId) {
return packagesRemoteDataSource
.refreshPackage(packageId)
.flatMap(new Function<Package, ObservableSource<Package>>() {
@Override
public ObservableSource<Package> apply(Package p) throws Exception {
return Observable
.just(p)
.doOnNext(new Consumer<Package>() {
@Override
public void accept(Package aPackage) throws Exception {
Package pkg = cachedPackages.get(aPackage.getNumber());
if (pkg != null) {
pkg.setData(aPackage.getData());
pkg.setReadable(true);
}
}
});
}
});
}
项目:GmArchMvvm
文件:RetryWithDelay.java
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable
.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond
+ " second, retry count " + retryCount);
return Observable.timer(retryDelaySecond,
TimeUnit.SECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
项目:Rxjava2.0Demo
文件:SplashActivity.java
private void start() {
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Long>>() {
@Override
public ObservableSource<? extends Long> apply(Throwable throwable) throws Exception {
return null;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(MainActivity.TAG, "accept: " + aLong);
startActivity(new Intent(SplashActivity.this, MainActivity.class));
finish();
}
});
dLists.add(disposable);
}
项目:EditPhoto
文件:ContrastView.java
private void initView() {
subject = PublishSubject.create();
subject.debounce(0, TimeUnit.MILLISECONDS)
// .filter(new Predicate<Float>() {
// @Override
// public boolean test(Float contrast) throws Exception {
// return true;
// }
// })
.distinctUntilChanged()
.switchMap(new Function<Float, ObservableSource<ColorMatrixColorFilter>>() {
@Override
public ObservableSource<ColorMatrixColorFilter> apply(Float value) throws Exception {
return postContrast(value);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<ColorMatrixColorFilter>() {
@Override
public void accept(ColorMatrixColorFilter colorMatrixColorFilter) throws Exception {
setColorFilter(colorMatrixColorFilter);
}
});
}
项目:sqlbrite-sqlcipher
文件:BriteContentResolverTest.java
@Override protected void setUp() throws Exception {
super.setUp();
contentResolver = getMockContentResolver();
SqlBrite.Logger logger = new SqlBrite.Logger() {
@Override public void log(String message) {
logs.add(message);
}
};
ObservableTransformer<Query, Query> queryTransformer =
new ObservableTransformer<Query, Query>() {
@Override public ObservableSource<Query> apply(Observable<Query> upstream) {
return upstream.takeUntil(killSwitch);
}
};
db = new BriteContentResolver(contentResolver, logger, scheduler, queryTransformer);
getProvider().init(getContext().getContentResolver());
}
项目:wayf-cloud
文件:IdentityProviderUsageFacadeImpl.java
/**
* Given an Observable stream of DeviceAccess, get the unique IdentityProviders in the stream and their latest created date
*/
private ObservableSource<HashMap<Long, Date>> getLatestActiveDateForIdpId(Observable<DeviceAccess> deviceAccessObservable) {
return deviceAccessObservable.collect(
() -> new HashMap<Long, Date>(),
(latestActiveDateByIdpId, _deviceAccess) -> {
Long idpId = _deviceAccess.getIdentityProvider().getId();
Date deviceAccessDate = _deviceAccess.getCreatedDate();
Date latestActiveDate = latestActiveDateByIdpId.get(idpId);
if (latestActiveDate == null) {
latestActiveDateByIdpId.put(idpId, deviceAccessDate);
} else if (latestActiveDate.after(deviceAccessDate)) {
latestActiveDateByIdpId.put(idpId, latestActiveDate);
} else {
latestActiveDateByIdpId.put(idpId, deviceAccessDate);
}
})
.toObservable();
}
项目:Android-Code-Demos
文件:OperatorTest.java
private static void debounceTest(int i) {
Observable.just(i)
.debounce(1000, TimeUnit.MILLISECONDS)
/*以最近请求的数据为准*/
.switchMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(String.valueOf(integer));
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
项目:BrotherWeather
文件:GankioAllPresenter.java
public void getGankioData(GankioType type, int count, int page, boolean isProgress) {
if (isProgress) {
view.showProgress("");
}
wrap(gankioRepository.getAllGankioData(type, count, page)).flatMap(
new Function<BaseResult<GankioData>, ObservableSource<BaseResult<GankioData>>>() {
@Override public ObservableSource<BaseResult<GankioData>> apply(
@NonNull BaseResult<GankioData> gankioDataBaseResult) throws Exception {
//List<GankioData> results = ;
for (GankioData gankioData : gankioDataBaseResult.getResults()) {
boolean b = dbRepository.queryBrowseHistory(gankioData.get_id());
gankioData.setBrowseHistory(b);
}
return Observable.just(gankioDataBaseResult);
}
}).subscribe(new ViewObserver<GankioData>(view) {
@Override protected void onSuccess(List<GankioData> t) {
view.display(t);
}
@Override public void onError(@NonNull Throwable e) {
super.onError(e);
view.displayError();
}
});
}
项目:RxJava4AndroidDemos
文件:FlatMap.java
@Override
public void test0() {
Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9");
Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
return Observable.just(integer * 2 + "", integer * 3 + "");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Action run() for onComplete()");
}
});
}
项目:Quran
文件:TranslationManagerPresenter.java
Observable<TranslationList> getCachedTranslationListObservable(final boolean forceDownload) {
return Observable.defer(new Callable<ObservableSource<? extends TranslationList>>() {
@Override
public ObservableSource<TranslationList> call() throws Exception {
boolean isCacheStale = System.currentTimeMillis() -
quranSettings.getLastUpdatedTranslationDate() > Constants.MIN_TRANSLATION_REFRESH_TIME;
if (forceDownload || isCacheStale) {
return Observable.empty();
}
try {
File cachedFile = getCachedFile();
if (cachedFile.exists()) {
Moshi moshi = new Moshi.Builder().build();
JsonAdapter<TranslationList> jsonAdapter = moshi.adapter(TranslationList.class);
return Observable.just(jsonAdapter.fromJson(Okio.buffer(Okio.source(cachedFile))));
}
} catch (Exception e) {
Crashlytics.logException(e);
}
return Observable.empty();
}
});
}
项目:Quran
文件:AudioManagerUtils.java
@NonNull
public static Single<List<QariDownloadInfo>> shuyookhDownloadObservable(
final String basePath, List<QariItem> qariItems) {
return Observable.fromIterable(qariItems)
.flatMap(new Function<QariItem, ObservableSource<QariDownloadInfo>>() {
@Override
public ObservableSource<QariDownloadInfo> apply(QariItem item) throws Exception {
QariDownloadInfo cached = sCache.get(item);
if (cached != null) {
return Observable.just(cached);
}
File baseFile = new File(basePath, item.getPath());
return !baseFile.exists() ? Observable.just(new QariDownloadInfo(item)) :
item.isGapless() ? getGaplessSheikhObservable(baseFile, item).toObservable() :
getGappedSheikhObservable(baseFile, item).toObservable();
}
})
.doOnNext(qariDownloadInfo -> sCache.put(qariDownloadInfo.qariItem, qariDownloadInfo))
.toList()
.subscribeOn(Schedulers.io());
}
项目:rxjava2_retrofit2
文件:ResultTransformer.java
private static <T> Function<HttpResponseResult<T>, ObservableSource<T>> flatMap() {
return new Function<HttpResponseResult<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(@NonNull final HttpResponseResult<T> tHttpResponseResult) throws Exception {
return new Observable<T>() {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (tHttpResponseResult.isSuccess()) {
observer.onNext(tHttpResponseResult.getResult());
observer.onComplete();
} else {
observer.onError(new HttpResponseException(tHttpResponseResult.getMsg(), tHttpResponseResult.getState()));
}
}
};
}
};
}
项目:Reactive-Android-Programming
文件:TimingObservableTransformer.java
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
return Observable.combineLatest(
Observable.just(new Date()),
upstream,
Pair::create
)
.doOnNext((pair) -> {
Date currentTime = new Date();
long diff = currentTime.getTime() - pair.first.getTime();
long diffSeconds = diff / 1000;
timerAction.accept(diffSeconds);
})
.map(pair -> pair.second);
}
项目:JBase
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> io_main() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.newThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//设定是否开启dialog窗
if (showLoading)
LoadingDialog.show();
}
}).compose(lifecycle)//绑定Lifecycle,解决网络请求内存溢出问题
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:android-mvp-architecture
文件:AppDataManager.java
@Override
public Observable<Boolean> seedDatabaseOptions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mDbHelper.isOptionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = new TypeToken<List<Option>>() {
}
.getType();
List<Option> optionList = gson.fromJson(
CommonUtils.loadJSONFromAsset(mContext,
AppConstants.SEED_DATABASE_OPTIONS),
type);
return saveOptionList(optionList);
}
return Observable.just(false);
}
});
}
项目:MVPArmsTest1
文件:UserModel.java
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
.getUsers(lastIdQueried, USERS_PER_PAGE);
//使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
return mRepositoryManager.obtainCacheService(CommonCache.class)
.getUsers(users
, new DynamicKey(lastIdQueried)
, new EvictDynamicKey(update))
.flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
@Override
public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
return Observable.just(listReply.getData());
}
});
}
项目:android-mvvm-architecture
文件:AppDataManager.java
@Override
public Observable<Boolean> seedDatabaseQuestions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mDbHelper.isQuestionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = $Gson$Types
.newParameterizedTypeWithOwner(null, List.class,
Question.class);
List<Question> questionList = gson.fromJson(
CommonUtils.loadJSONFromAsset(mContext,
AppConstants.SEED_DATABASE_QUESTIONS),
type);
return saveQuestionList(questionList);
}
return Observable.just(false);
}
});
}
项目:Aurora
文件:UserModel.java
@Override
public Observable<List<User>> getUsers(int lastIdQueried, boolean update) {
Observable<List<User>> users = mRepositoryManager.obtainRetrofitService(UserService.class)
.getUsers(lastIdQueried, USERS_PER_PAGE);
//使用rxcache缓存,上拉刷新则不读取缓存,加载更多读取缓存
return mRepositoryManager.obtainCacheService(CommonCache.class)
.getUsers(users
, new DynamicKey(lastIdQueried)
, new EvictDynamicKey(update))
.flatMap(new Function<Reply<List<User>>, ObservableSource<List<User>>>() {
@Override
public ObservableSource<List<User>> apply(@NonNull Reply<List<User>> listReply) throws Exception {
return Observable.just(listReply.getData());
}
});
}
项目:RxPay
文件:RxPayUtils.java
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
return new ObservableTransformer<PayResult, PayResult>() {
@Override
public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
return upstream.map(new Function<PayResult, PayResult>() {
@Override
public PayResult apply(PayResult payResult) throws Exception {
if (!payResult.isSucceed()) {
throw new PayFailedException(payResult.getErrInfo());
}
return payResult;
}
});
}
};
}
项目:android-mvp-interactor-architecture
文件:SplashInteractor.java
@Override
public Observable<Boolean> seedDatabaseOptions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mOptionRepository.isOptionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = new TypeToken<List<Option>>() {
}.getType();
List<Option> optionList = gson.fromJson(
FileUtils.loadJSONFromAsset(
mContext,
AppConstants.SEED_DATABASE_OPTIONS),
type);
return mOptionRepository.saveOptionList(optionList);
}
return Observable.just(false);
}
});
}
项目:RxEasyHttp
文件:RxUtil.java
public static <T> ObservableTransformer<T, T> io_main() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:RxJava2-Android-Sample
文件:FlatMapExampleActivity.java
private void doSomeWork() {
getObservable().flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just("flatMap func-->" + s);
}
}).buffer(3, 1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
// 3 means, it takes max of three from its start index and create list
// 1 means, it jumps one step every time
// so the it gives the following list
// 1 - flatMap func-->one, flatMap func-->two, flatMap func-->three
// 2 - flatMap func-->two, flatMap func-->three, flatMap func-->four
// 3 - flatMap func-->three, flatMap func-->four, flatMap func-->five
// 4 - flatMap func-->four, flatMap func-->five
// 5 - flatMap func-->five
}
项目:android-mvvm-architecture
文件:AppDataManager.java
@Override
public Observable<Boolean> seedDatabaseOptions() {
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
final Gson gson = builder.create();
return mDbHelper.isOptionEmpty()
.concatMap(new Function<Boolean, ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> apply(Boolean isEmpty)
throws Exception {
if (isEmpty) {
Type type = new TypeToken<List<Option>>() {
}
.getType();
List<Option> optionList = gson.fromJson(
CommonUtils.loadJSONFromAsset(mContext,
AppConstants.SEED_DATABASE_OPTIONS),
type);
return saveOptionList(optionList);
}
return Observable.just(false);
}
});
}
项目:Reactive-Android-Programming
文件:TimingObservableTransformer.java
@Override
public ObservableSource<R> apply(Observable<R> upstream) {
return Observable.combineLatest(
Observable.just(new Date()),
upstream,
Pair::create
)
.doOnNext((pair) -> {
Date currentTime = new Date();
long diff = currentTime.getTime() - pair.first.getTime();
long diffSeconds = diff / 1000;
timerAction.accept(diffSeconds);
})
.map(pair -> pair.second);
}
项目:KTools
文件:RxPermissions.java
/**
* Map emitted items from the source observable into {@link Permission} objects for each
* permission in parameters.
* <p>
* If one or several permissions have never been requested, invoke the related framework method
* to ask the user if he allows the permissions.
*/
@SuppressWarnings("WeakerAccess")
public ObservableTransformer<Object, Permission> ensureEach(final String... permissions) {
return new ObservableTransformer<Object, Permission>() {
@Override
public ObservableSource<Permission> apply(Observable<Object> observable) {
return request(observable, permissions);
}
};
}
项目:RxProgress
文件:RxProgress.java
private <T> Observable<T> forObservable(Observable<T> source) {
return Observable.using(this::makeDialog,
new Function<ProgressDialog, ObservableSource<? extends T>>() {
@Override
public ObservableSource<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception {
return Observable.create(emitter -> {
if (builder.cancelable) {
dialog.setOnCancelListener(dialogInterface -> emitter.onComplete());
}
dialog.setOnDismissListener(dialogInterface -> emitter.onComplete());
source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
}
}, Dialog::dismiss);
}
项目:RxJava2-Android-Sample
文件:TimestampExampleActivity.java
private void doSomeWork() {
getObservable()
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s).delay(500, TimeUnit.MILLISECONDS);
}
})
.timestamp()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
项目:GitHub
文件:Car.java
public Observable<String> brandDeferObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(brand);
}
});
}
项目:GitHub
文件:NetworkingActivity.java
/**
* take Operator Example
*/
public void take(View view) {
getUserListObservable()
.flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one
@Override
public ObservableSource<User> apply(List<User> usersList) throws Exception {
return Observable.fromIterable(usersList); // returning user one by one from usersList.
}
})
.take(4) // it will only emit first 4 users out of all
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(User user) {
// // only four user comes here one by one
Log.d(TAG, "user : " + user.toString());
}
@Override
public void onError(Throwable e) {
Utils.logError(TAG, e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
项目:GitHub
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableAsync() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:RxJava2-Android-Sample
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableCompute() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableMainThread() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:MainActivity.java
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(5000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}
项目:GitHub
文件:Rx2OperatorExampleActivity.java
/************************************
* take operator start
************************************/
public void take(View view) {
getUserListObservable()
.flatMap(new Function<List<User>, ObservableSource<User>>() { // flatMap - to return users one by one
@Override
public ObservableSource<User> apply(List<User> usersList) throws Exception {
return Observable.fromIterable(usersList); // returning user one by one from usersList.
}
})
.take(4) // it will only emit first 4 users out of all
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(User user) {
// // only four user comes here one by one
Log.d(TAG, "user id : " + user.id);
Log.d(TAG, "user firstname : " + user.firstname);
Log.d(TAG, "user lastname : " + user.lastname);
Log.d(TAG, "isFollowing : " + user.isFollowing);
}
@Override
public void onError(Throwable e) {
Utils.logError(TAG, e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}