Java 类io.reactivex.ObservableTransformer 实例源码
项目:RxPay
文件:RxPayUtils.java
public static ObservableTransformer<WxPayResult, WxPayResult> checkWechatResult() {
return new ObservableTransformer<WxPayResult, WxPayResult>() {
@Override
public ObservableSource<WxPayResult> apply(Observable<WxPayResult> payResultObservable) {
return payResultObservable.map(new Function<WxPayResult, WxPayResult>() {
@Override
public WxPayResult apply(WxPayResult wxPayResult) {
if (!wxPayResult.isSucceed()) {
throw new PayFailedException(wxPayResult.getErrInfo());
}
return wxPayResult;
}
});
}
};
}
项目:richeditor
文件:RxSchedulers.java
public static <T> ObservableTransformer<T, T> compose() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
if (!Utils.isNetworkAvailable()) {
//disposable.dispose();
//throw new NetworkNoAvailableException("no network,please check and retry");
//Utils.MakeShortToast("no network,please check and retry");
//throw new Exception();
}
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:cleanarchitecture-unidirectional
文件:EmailLoginPresenter.java
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
return upstream -> {
final Observable<EmailLoginActions.LoginAction> loginActionObservable = upstream
.ofType(EmailLoginActions.LoginAction.class);
return loginActionObservable
.flatMap(loginAction -> {
if (!Patterns.EMAIL_ADDRESS.matcher(loginAction.getEmail()).matches()) {
return Observable.just(Result.<Boolean, EmailLoginActions.LoginAction>error(loginAction, new FormValidationException("Must enter a valid email address!")));
}
if (!Pattern.compile("[0-9a-zA-Z]{6,}").matcher(loginAction.getPassword()).matches()) {
return Observable.just(Result.<Boolean, EmailLoginActions.LoginAction>error(loginAction, new FormValidationException("Password must be at least 6 characters long!")));
}
return useCase.performAction(loginAction)
.onErrorReturn(throwable -> Result.error(loginAction, throwable))
.startWith(Result.<Boolean, EmailLoginActions.LoginAction>loading(loginAction));
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io());
};
}
项目:RxLog
文件:RxLog.java
/**
* Creates transform operator, which logs defined events in observable's lifecycle
* @param msg message
* @param bitMask bitmask of events which you want to log
* @param <T> type
* @return transformer
*/
public static <T> ObservableTransformer<T, T> logObservable(final String msg, final int bitMask) {
return upstream -> {
if ((bitMask & LOG_SUBSCRIBE) > 0) {
upstream = upstream.compose(oLogSubscribe(msg));
}
if ((bitMask & LOG_TERMINATE) > 0) {
upstream = upstream.compose(oLogTerminate(msg));
}
if ((bitMask & LOG_ERROR) > 0) {
upstream = upstream.compose(oLogError(msg));
}
if ((bitMask & LOG_COMPLETE) > 0) {
upstream = upstream.compose(oLogComplete(msg));
}
if ((bitMask & LOG_NEXT_DATA) > 0) {
upstream = upstream.compose(oLogNext(msg));
} else if ((bitMask & LOG_NEXT_EVENT) > 0) {
upstream = upstream.compose(oLogNextEvent(msg));
}
if ((bitMask & LOG_DISPOSE) > 0) {
upstream = upstream.compose(oLogDispose(msg));
}
return upstream;
};
}
项目:NeiHanDuanZiTV
文件:RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxUtils.bindToLifecycle(view));
}
};
}
项目:RetrofitSample
文件:NetUtil.java
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {
try {
return baseResponseObservable -> baseResponseObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(baseResponse -> {
if (baseResponse.getErrorCode() == 0) {
if (baseResponse.getData() != null) {
return Observable.just(baseResponse.getData());
} else {
//这种情况是没有data的情况下需要走onComplete来进行处理
return Observable.empty();
}
} else {
return Observable.error(new DlException(baseResponse.getErrorCode(), baseResponse.getMsg()));
}
});
} catch (Exception e) {
e.printStackTrace();
return baseResponseObservable -> baseResponseObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(baseResponse -> Observable.error(new Throwable("服务器错误")));
}
}
项目:sqlbrite-sqlcipher
文件:BriteDatabaseTest.java
@Before public void setUp() throws IOException {
helper = new TestDb(InstrumentationRegistry.getContext(), dbFolder.newFile().getPath());
real = helper.getWritableDatabase();
SqlBrite.Logger logger = new SqlBrite.Logger() {
@Override public void log(String message) {
logs.add(message);
}
};
ObservableTransformer<Query, Query> queryTransformer =
new ObservableTransformer<Query, Query>() {
@Override public ObservableSource<Query> apply(Observable<Query> upstream) {
return upstream.takeUntil(killSwitch);
}
};
PublishSubject<Set<String>> triggers = PublishSubject.create();
db = new BriteDatabase(helper, logger, triggers, triggers, scheduler, queryTransformer);
}
项目:MoligyMvpArms
文件:RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
项目:RxGps
文件:StatusExceptionResumeNextTransformer.java
public static <R extends Result> ObservableTransformer<R, R> forObservable() {
return upstream -> upstream.onErrorResumeNext(throwable -> {
if(throwable instanceof StatusException) {
StatusException statusException = (StatusException) throwable;
if(statusException.getStatus().hasResolution()) {
return Observable.just((R) statusException.getResult());
} else {
return Observable.error(throwable);
}
} else {
return Observable.error(throwable);
}
});
}
项目:cleanarchitecture-unidirectional
文件:TimezonePresenter.java
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
return upstream -> Observable.merge(
upstream.ofType(TimezonesUiActions.RefreshAction.class)
.flatMap(refreshAction ->
listUseCase.performAction(refreshAction)
.onErrorReturn(t -> Result.error(refreshAction, t))
.startWith(Result.loading(refreshAction))),
upstream.ofType(TimezonesUiActions.LoadMoreAction.class)
.flatMap(loadMoreAction ->
listUseCase.performAction(loadMoreAction)
.onErrorReturn(t -> Result.error(loadMoreAction, t))
.startWith(Result.loading(loadMoreAction))),
upstream.ofType(DeleteTimezoneAction.class)
.flatMap(action ->
deleteUseCase.performAction(action)
.onErrorReturn(t -> Result.error(action, t))
.startWith(Result.loading(action)))
);
}
项目:RxEasyHttp
文件:RxUtil.java
public static <T> ObservableTransformer<T, T> io_main() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:RxPay
文件:RxPayUtils.java
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() {
return new ObservableTransformer<PayResult, PayResult>() {
@Override
public ObservableSource<PayResult> apply(Observable<PayResult> upstream) {
return upstream.map(new Function<PayResult, PayResult>() {
@Override
public PayResult apply(PayResult payResult) throws Exception {
if (!payResult.isSucceed()) {
throw new PayFailedException(payResult.getErrInfo());
}
return payResult;
}
});
}
};
}
项目:Aurora
文件:RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulersWithLifeCycle(IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
})
.compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
项目:Bing
文件:RxSchedulers.java
public static <T> ObservableTransformer<T, T> compose() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
// if (!NetworkUtils.isConnected()) {
// Toast.makeText(Bing.getApplicationContext(), R.string.toast_network_error, Toast.LENGTH_SHORT).show();
// }
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:MVPArmsTest1
文件:RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxUtils.bindToLifecycle(view));
}
};
}
项目:gigreminder
文件:ConcertDetailsPresenter.java
@Override
protected Observable<Result> handleEvents(Observable<UiEvent> events) {
ObservableTransformer<LoadConcertEvent, Result> loadConcert = loadEvents ->
loadEvents.flatMap(event -> {
return getRepository().getConcert(concertId)
.map(LoadConcertResult::success)
.onErrorReturn(LoadConcertResult::error)
.toObservable()
.startWith(LoadConcertResult.IN_PROGRESS);
});
return events.publish(sharedEvents -> Observable.merge(
handleEventsOfClass(sharedEvents, LoadConcertEvent.class, loadConcert),
sharedEvents.ofType(Result.class)
));
}
项目:cleanarchitecture-unidirectional
文件:CheckUserPresenter.java
@Override
protected ObservableTransformer<UiEvent, Action> eventsToActions() {
return upstream -> upstream
.map(event -> getLastState().getData() == null || getLastState().getData().getPhase() == CheckPhases.STATE_START
? CheckUserActions.exists(userManager.getUserId())
: CheckUserActions.createRecord(userManager.getUserId(), userManager.getUserName(), userManager.getUserEmail(), userManager.getAvatar()));
}
项目:RxJava2-Android-Sample
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableAsync() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableAsync() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:GitHub
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableCompute() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:sqlbrite-sqlcipher
文件:BriteDatabase.java
BriteDatabase(Context context,SQLiteOpenHelper helper, @NonNull char[] password, Logger logger, Observable<Set<String>> triggerSource,
Observer<Set<String>> triggerSink, Scheduler scheduler,
ObservableTransformer<Query, Query> queryTransformer) {
SQLiteDatabase.loadLibs(context);
this.helper = helper;
this.password = password;
this.logger = logger;
this.triggerSource = triggerSource;
this.triggerSink = triggerSink;
this.scheduler = scheduler;
this.queryTransformer = queryTransformer;
}
项目:GitHub
文件:Utils.java
public static <U> ObservableTransformer<U, U> retry(final String hint, final int retryCount) {
return new ObservableTransformer<U, U>() {
@Override
public ObservableSource<U> apply(Observable<U> upstream) {
return upstream.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
return retry(hint, retryCount, integer, throwable);
}
});
}
};
}
项目:cleanarchitecture-unidirectional
文件:TimezoneEditPresenter.java
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
return upstream -> Observable.merge(
upstream.ofType(TimezoneEditUiActions.GetTimezoneAction.class)
.flatMap(action -> getUseCase.performAction(action)
.onErrorReturn(t -> Result.error(action, t))
.startWith(Result.loading(action))
),
upstream.ofType(TimezoneEditUiActions.SaveTimezoneAction.class)
.flatMap(action -> {
if (action.getName() == null || action.getName().isEmpty()) {
return Observable.just(Result.error(action, new FormValidationException("Name cannot be empty!")));
} else if (action.getCity() == null || action.getCity().isEmpty()) {
return Observable.just(Result.error(action, new FormValidationException("City cannot be empty!")));
} else if (action.getDifference() < -12 || action.getDifference() > 12) {
return Observable.just(Result.error(action, new FormValidationException("Difference must be an integer between -12 and +12!")));
}
final Observable<Result> result;
if (action.getId() == null) {
result = saveUseCase.performAction(action);
} else {
result = updateUseCase.performAction(action);
}
return result
.onErrorReturn(t -> Result.error(action, t))
.startWith(Result.loading(action));
}),
upstream.ofType(DeleteTimezoneAction.class)
.flatMap(action -> deleteUseCase.performAction(action)
.onErrorReturn(t -> Result.error(action, t))
.startWith(Result.loading(action))
)
);
}
项目:RxPermission
文件:RealRxPermission.java
/**
* Map emitted items from the source observable into {@link Permission} objects for each
* permission in parameters.
* <p>
* If one or several permissions have never been requested, invoke the related framework method
* to ask the user if he allows the permissions.
*/
@NonNull @CheckReturnValue private <T> ObservableTransformer<T, Permission> ensureEach(@NonNull final String... permissions) {
checkPermissions(permissions);
return new ObservableTransformer<T, Permission>() {
@Override @NonNull @CheckReturnValue public ObservableSource<Permission> apply(final Observable<T> o) {
return request(o, permissions);
}
};
}
项目:GitHub
文件:ObservableUseCase.java
public ObservableUseCase(final UseCaseExecutor useCaseExecutor,
final PostExecutionThread postExecutionThread) {
super(useCaseExecutor, postExecutionThread);
schedulersTransformer = new ObservableTransformer<R, R>() {
@Override
public Observable<R> apply(Observable<R> rObservable) {
return rObservable.subscribeOn(useCaseExecutor.getScheduler())
.observeOn(postExecutionThread.getScheduler());
}
};
}
项目:XDroid-Databinding
文件:XApi.java
public static <T> ObservableTransformer<T, T> getObservableScheduler(final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> retryWhenHandler) {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.retryWhen(retryWhenHandler)
.onErrorResumeNext(new ServerResultErrorFunc<T>())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:redux-observable
文件:SchedulerObservableTransformer.java
@SuppressWarnings("unchecked")
@Override public <T> ObservableTransformer<T, T> transformer() {
return (ObservableTransformer<T, T>) new ObservableTransformer() {
@Override public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
}
};
}
项目:SuperHttp
文件:ApiCache.java
public <T> ObservableTransformer<T, CacheResult<T>> transformer(CacheMode cacheMode, final Type type) {
final ICacheStrategy strategy = loadStrategy(cacheMode);//获取缓存策略
return new ObservableTransformer<T, CacheResult<T>>() {
@Override
public ObservableSource<CacheResult<T>> apply(Observable<T> apiResultObservable) {
return strategy.execute(ApiCache.this, ApiCache.this.cacheKey, apiResultObservable, type);
}
};
}
项目:cleanarchitecture-unidirectional
文件:EmailLoginPresenter.java
@Override
protected ObservableTransformer<UiEvent, Action> eventsToActions() {
return upstream -> upstream
.ofType(EmailLoginUiEvents.LoginClicked.class)
.map((Function<EmailLoginUiEvents.LoginClicked, Action>) loginClicked -> EmailLoginActions.login(loginClicked.getEmail(), loginClicked.getPassword()))
.observeOn(AndroidSchedulers.mainThread());
}
项目:Ec2m
文件:TransformUtils.java
public static <T> ObservableTransformer<T, T> all_io() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.observeOn(Schedulers.io()).subscribeOn(Schedulers.io());
}
};
}
项目:gigreminder
文件:ArtistsPresenter.java
@Override
protected Observable<Result> handleEvents(Observable<UiEvent> events) {
ObservableTransformer<LoadArtistsEvent, Result> loadArtists = loadEvents ->
loadEvents.flatMap(event ->
getRepository().getArtists()
.map(LoadArtistsResult::success)
.onErrorReturn(LoadArtistsResult::error)
.startWith(LoadArtistsResult.IN_PROGRESS)
);
ObservableTransformer<DeleteArtistsEvent, Result> deleteArtists = deleteEvents ->
deleteEvents.flatMap(event -> {
List<Artist> artists = event.getUiModel().getSelectedArtists();
return getRepository().deleteArtists(artists)
.toSingleDefault(DeleteArtistsResult.SUCCESS)
.onErrorReturn(DeleteArtistsResult::error)
.toObservable()
.startWith(DeleteArtistsResult.IN_PROGRESS);
});
return events.publish(sharedEvents -> Observable.merge(
handleEventsOfClass(sharedEvents, LoadArtistsEvent.class, loadArtists),
handleEventsOfClass(sharedEvents, DeleteArtistsEvent.class,
deleteArtists),
sharedEvents.ofType(Result.class))
);
}
项目:cleanarchitecture-unidirectional
文件:ForgotPwPresenter.java
@Override
protected ObservableTransformer<Action, Result> actionsToResults() {
return upstream -> upstream.ofType(ForgotPwActions.ForgotPwSubmit.class)
.flatMap(action -> {
if (!Patterns.EMAIL_ADDRESS.matcher(action.getEmail()).matches()) {
return Observable.just(Result.<Boolean, ForgotPwActions.ForgotPwSubmit>error(action, new FormValidationException("Must enter a valid email address!")));
}
return useCase.performAction(action)
.onErrorReturn(throwable -> Result.error(action, throwable))
.startWith(Result.<Boolean, ForgotPwActions.ForgotPwSubmit>loading(action));
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io());
}
项目:FastLib
文件:FastTransformer.java
/**
* 线程切换
*
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T, T> switchSchedulers() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return switchSchedulers(upstream);
}
};
}
项目:RxTransfer
文件:RxDownload.java
public static <T> ObservableTransformer<T, T> showDialog(final Activity activity, int messageRes) {
return observable -> Observable.fromCallable(() -> {
showProgressDialog(activity, messageRes);
return true;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(t -> observable)
.doOnTerminate(() -> dismissProgressDialog());
}
项目:RetrofitCache
文件:CacheTransformer.java
public static <T> ObservableTransformer<T, T> emptyTransformer(){
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(io.reactivex.Observable<T> upstream) {
String name = upstream.getClass().getName();
if (name.equals(CLASS_NAME1)||name.equals(CLASS_NAME2)){
observable(upstream);
}
return upstream;
}
};
}
项目:RxJava2-Android-Sample
文件:RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableCompute() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:POCenter
文件:RxUtils.java
/**
* If the publish is empty, it wll run onError and throw a NoSuchElementException
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T, T> notEmptyOrError() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.switchIfEmpty(new Observable<T>() {
@Override
protected void subscribeActual(Observer<? super T> observer) {
observer.onError(new NoSuchElementException());
}
});
}
};
}
项目:rxjava2_retrofit2
文件:ResultTransformer.java
public static <T> ObservableTransformer<HttpResponseResult<T>, T> transformer() {
return new ObservableTransformer<HttpResponseResult<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<HttpResponseResult<T>> upstream) {
return upstream
.flatMap(ResultTransformer.<T>flatMap())
.compose(SchedulerTransformer.<T>transformer());
}
};
}
项目:rxjava2_retrofit2
文件:SchedulerTransformer.java
public static <T> ObservableTransformer<T, T> transformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:rxjava2_retrofit2
文件:DialogTransformer.java
public <T> ObservableTransformer<T, T> transformer() {
return new ObservableTransformer<T, T>() {
private ProgressDialog progressDialog;
@Override
public ObservableSource<T> apply(final Observable<T> upstream) {
return upstream.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull final Disposable disposable) throws Exception {
progressDialog = ProgressDialog.show(activity, null, msg, true, cancelable);
if (cancelable) {
progressDialog.setOnCancelListener(new DialogInterface.OnCancelListener() {
@Override
public void onCancel(DialogInterface dialog) {
disposable.dispose();
}
});
}
}
}).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
if (progressDialog.isShowing()) {
progressDialog.cancel();
}
}
});
}
};
}