@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) { final String service = Context.CONNECTIVITY_SERVICE; final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service); return Observable.create(new ObservableOnSubscribe<Connectivity>() { @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception { networkCallback = createNetworkCallback(subscriber, context); final NetworkRequest networkRequest = new NetworkRequest.Builder().build(); manager.registerNetworkCallback(networkRequest, networkCallback); } }).doOnDispose(new Action() { @Override public void run() { tryToUnregisterCallback(manager); } }).startWith(Connectivity.create(context)).distinctUntilChanged(); }
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) { final String service = Context.CONNECTIVITY_SERVICE; final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service); networkCallback = createNetworkCallback(context); registerIdleReceiver(context); final NetworkRequest request = new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) .addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED) .build(); manager.registerNetworkCallback(request, networkCallback); return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() { @Override public void run() { tryToUnregisterCallback(manager); tryToUnregisterReceiver(context); } }).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable(); }
@Test @UiThreadTest public void realmList_closeInDoOnUnsubscribe() { realm.beginTransaction(); RealmList<Dog> list = realm.createObject(AllTypes.class).getColumnRealmList(); realm.commitTransaction(); Flowable<RealmList<Dog>> observable = list.asFlowable().doOnCancel(new Action() { @Override public void run() throws Exception { realm.close(); } }); subscription = observable.subscribe(new Consumer<RealmList<Dog>>() { @Override public void accept(RealmList<Dog> ignored) throws Exception { } }); subscription.dispose(); assertTrue(realm.isClosed()); }
@Test @UiThreadTest public void dynamicRealmResults_closeInDoOnUnsubscribe() { final DynamicRealm dynamicRealm = DynamicRealm.getInstance(realm.getConfiguration()); Flowable<RealmResults<DynamicRealmObject>> flowable = dynamicRealm.where(AllTypes.CLASS_NAME).findAll().asFlowable() .doOnCancel(new Action() { @Override public void run() throws Exception { dynamicRealm.close(); } }); subscription = flowable.subscribe(new Consumer<RealmResults<DynamicRealmObject>>() { @Override public void accept(RealmResults<DynamicRealmObject> ignored) throws Exception { } }); subscription.dispose(); assertTrue(dynamicRealm.isClosed()); }
@Test @UiThreadTest public void realmObject_closeInDoOnUnsubscribe() { realm.beginTransaction(); realm.createObject(AllTypes.class); realm.commitTransaction(); Flowable<AllTypes> flowable = realm.where(AllTypes.class).findFirst().<AllTypes>asFlowable() .doOnCancel(new Action() { @Override public void run() throws Exception { realm.close(); } }); subscription = flowable.subscribe(new Consumer<AllTypes>() { @Override public void accept(AllTypes ignored) throws Exception { } }); subscription.dispose(); assertTrue(realm.isClosed()); }
private void restart() { mCompositeDisposable.add(FFMService.restart() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() throws Exception { findPreference("restart_webqq").setEnabled(true); } }) .subscribe(new Consumer<FFMResult>() { @Override public void accept(FFMResult ffmResult) throws Exception { Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show(); } })); }
private void stop() { mCompositeDisposable.add(FFMService.stop() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() throws Exception { findPreference("stop_webqq").setEnabled(true); } }) .subscribe(new Consumer<FFMResult>() { @Override public void accept(FFMResult ffmResult) throws Exception { Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show(); } })); }
@Before public void beforeEachTest() { MockitoAnnotations.initMocks(this); ToogleRefreshView view = new ToogleRefreshView() { @Override public Action disableRefresh() { return disableRefresh; } @Override public Action enableRefresh() { return enableRefresh; } }; toogler = new RefreshToogle<>(view, uiScheduler); }
/** * 合成并显示回复文本 **/ private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) { EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null)); SynthesizerBase.get().startSpeakAbsolute(content) .doOnNext(new Consumer<SpeechMsg>() { @Override public void accept(SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == SpeechMsg.State.OnBegin) EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START)); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END)); if (tracks != null) XmlyManager.get().getPlayer().playList(tracks, finalPlayIndex); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); }
/** * 合成文本并提前返回 **/ private void speakAndAheadReturn(String text, SpeechMsgBuilder msgBuilder) { /* 将回复文本发送到聊天列表 */ EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null)); /* 合成回复文本 */ msgBuilder.setText(text).setForceLocalEngine(true); SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build()) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new NavigateEvent(NavigateEvent.START_NAVI)); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); }
/** * takeUntil与skipUntil操作符作用相反,订阅并开始发射原始Observable,它还监视你提供的第二个Observable。 * 如果第二个Observable发射了一项数据或者发射了一个终止通知( onError通知或一个onComplete通知), * TakeUntil返回的Observable会停止发射原始Observable并终止。 */ private void doSomeWork() { if (!isRunning) { Observable .interval(1, TimeUnit.SECONDS) .take(6) .takeUntil(Observable.just(10).delay(3, TimeUnit.SECONDS)) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { isRunning = true; } }) .doOnTerminate(new Action() { @Override public void run() throws Exception { isRunning = false; } }) // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); } }
@Override protected void onStart() { super.onStart(); Log.d(TAG, "onStart()"); // Using automatic unsubscription, this should determine that the correct time to // unsubscribe is onStop (the opposite of onStart). Observable.interval(1, TimeUnit.SECONDS) .doOnDispose(new Action() { @Override public void run() throws Exception { Log.i(TAG, "Unsubscribing subscription from onStart()"); } }) .compose(this.<Long>bindToLifecycle()) .subscribe(new Consumer<Long>() { @Override public void accept(Long num) throws Exception { Log.i(TAG, "Started in onStart(), running until in onStop(): " + num); } }); }
private void speak(SpeechMsgBuilder msgBuilder, final boolean isAnim) { SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build()) .doOnNext(new Consumer<SpeechMsg>() { @Override public void accept(SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == SpeechMsg.State.OnBegin && isAnim) EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START)); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END)); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); }
/** * 合成并显示回复文本 **/ private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) { EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null)); IflySynthesizer.get().startSpeakAbsolute(content) .doOnNext(new Consumer<SpeechMsg>() { @Override public void accept(SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == SpeechMsg.State.OnBegin) EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START)); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END)); if (tracks != null) XmPlayerManager.getInstance(mContext).playList(tracks, finalPlayIndex); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); }
/** * 发送回复文本文本视图并合成声音 **/ private void showAndSpeak(SpeechMsgBuilder builder) { EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(builder.getText()), null, null, null)); IflySynthesizer.getInstance().startSpeakAbsolute(builder.build()) .doOnNext(new Consumer<SpeechMsg>() { @Override public void accept(SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == SpeechMsg.State.OnBegin) EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START)); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END)); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); }
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) { return new ObservableTransformer<T, T>() { @Override public Observable<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { view.showLoading();//显示进度条 } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doAfterTerminate(new Action() { @Override public void run() { view.hideLoading();//隐藏进度条 } }).compose(RxUtils.bindToLifecycle(view)); } }; }
@Override public void test0() { Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9"); Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { return Observable.just(integer * 2 + "", integer * 3 + ""); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
@Override public void test0() { Log.i(TAG, "test0() Range simple demo, repeat is 2"); Observable.just("1", "2", "3").repeat(2).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
/** * Saves most types of POJOs or collections in {@link Book} storage. * <p/> * To deserialize correctly it is recommended to have an all-args constructor, but other types * may be available. * * @param key object key is used as part of object's file name * @param value object to save, must have no-arg constructor, can't be null. * @return this Book instance */ public <T> Completable write(final String key, final T value) { return Completable.fromAction(new Action() { @Override public void run() { book.write(key, value); } }) // FIXME in RxJava1 the error would be propagated to updates. // In RxJava2 the error happens on the Completable this method returns. // This andThen block reproduces the behavior in RxJava1. .andThen(Completable.fromAction(new Action() { @Override public void run() throws Exception { try { updates.onNext(Pair.create(key, value)); } catch (Throwable t) { updates.onError(t); } } })).subscribeOn(scheduler); }
public static <T> ObservableTransformer<ApiResult<T>, T> _io_main() { return new ObservableTransformer<ApiResult<T>, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) { return upstream .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(new HandleFuc<T>()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed()); } }) .doFinally(new Action() { @Override public void run() throws Exception { HttpLog.i("+++doFinally+++"); } }) .onErrorResumeNext(new HttpResponseFunc<T>()); } }; }
public static <T> ObservableTransformer<ApiResult<T>, T> _main() { return new ObservableTransformer<ApiResult<T>, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) { return upstream //.observeOn(AndroidSchedulers.mainThread()) .map(new HandleFuc<T>()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed()); } }) .doFinally(new Action() { @Override public void run() throws Exception { HttpLog.i("+++doFinally+++"); } }) .onErrorResumeNext(new HttpResponseFunc<T>()); } }; }
@Override public void getHotMovieList(int limit) { mView.showLoading(); mHotMovieListManager.getHotMovieList(limit) .subscribe(new Consumer<HotMovieBean>() { @Override public void accept(@NonNull HotMovieBean hotMovieBean) throws Exception { mView.addHotMovieList(hotMovieBean.getData().getHot()); mView.addMovieIds(hotMovieBean.getData().getMovieIds()); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: " + throwable.getLocalizedMessage().toString()); mView.showError(throwable.getMessage().toString()); } }, new Action() { @Override public void run() throws Exception { mView.showContent(); } }); }
/** * Disposes an action in UI Thread * * @param dispose action to be executed * @return Disposable object */ private Disposable disposeInUiThread(final Action dispose) { return Disposables.fromAction(new Action() { @Override public void run() throws Exception { if (Looper.getMainLooper() == Looper.myLooper()) { dispose.run(); } else { final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker(); inner.schedule(new Runnable() { @Override public void run() { try { dispose.run(); } catch (Exception exception) { onError("Could not unregister receiver in UI Thread", exception); } inner.dispose(); } }); } } }); }
@Test public void testErrorHandlingInValueProvider() { // setup final AtomicBoolean missHandlerCalled = new AtomicBoolean(false); TestSubscriber<Integer> testSubscriber1 = new TestSubscriber<>(); subscribe(source.get("hello"), testSubscriber1); source.onNext("hello", new Callable<Integer>() { @Override public Integer call() throws Exception { throw new RuntimeException("Boom"); } }, new Action() { @Override public void run() { missHandlerCalled.set(true); } }); testSubscriber1.assertError(RuntimeException.class); }
@Override public void test0() { Log.i(TAG, "test0() Map simple demo, integer 1,2,3 transform to string 2,4,6"); Observable.just(1, 2, 3).map(new Function<Integer, String>() { @Override public String apply(@NonNull Integer integer) throws Exception { return Integer.toString(integer * 2); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
public static <T> FlowableTransformer<T, T> applySchedules(final IView view) { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Subscription>() { @Override public void accept(Subscription subscription) throws Exception { view.showLoading(); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnTerminate(new Action() { @Override public void run() throws Exception { view.hideLoading(); } }); } }; }
@Override public void test0() { Log.i(TAG, "test0() Buffer simple demo, 1,2,3 buffer(2)"); Observable.just("1", "2", "3").buffer(2).subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Exception { Log.d(TAG, "Consumer<String> accept() strings: " + strings); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
public static <T> ObservableOperator<T, T> doOnEmpty(Action action) { return new ObservableOperator<T, T>() { @Override public Observer<? super T> apply(Observer<? super T> observer) throws Exception { return new DisposableObserver<T>() { boolean isEmpty = true; @Override public void onNext(T value) { isEmpty = false; observer.onNext(value); } @Override public void onError(Throwable t) { observer.onError(t); } @Override public void onComplete() { if (isEmpty) { try { action.run(); } catch (Exception e) { onError(e); return; } } observer.onComplete(); } }; } }; }
private void delete() { dispose(data.disposable); mRxDownload.deleteServiceDownload(data.record.getUrl(), true) .doFinally(new Action() { @Override public void run() throws Exception { mAdapter.remove(getAdapterPosition()); } }) .subscribe(); }
private void uploadNotificationsToggle(final Preference preference) { final NotificationToggle newNotificationToggle = NotificationToggle.create(mFriendToggle.isChecked(), mGroupToggle.isChecked()); if (newNotificationToggle.equals(mServerNotificationToggle)) { return; } preference.setEnabled(false); mCompositeDisposable.add(FFMService .updateNotificationsToggle(NotificationToggle.create(mFriendToggle.isChecked(), mGroupToggle.isChecked())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() throws Exception { preference.setEnabled(true); } }) .subscribe(new Consumer<FFMResult>() { @Override public void accept(FFMResult result) throws Exception { mServerNotificationToggle = newNotificationToggle; //Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show(); Log.d("Sync", "updateNotificationsToggle success, new state: " + newNotificationToggle); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show(); Log.w("Sync", "updateNotificationsToggle failed", throwable); } })); }
@Override public @NonNull Completable updateLoginMethod(@NonNull final LoginMethod method) { checkNotNull(method); return persistence.write(LOGIN_METHOD, method) .doOnComplete(new Action() { @Override public void run() throws Exception { methodRelay.accept(method); } }); }
/** * 合成并在聊天视图中显示回复文本 * * @param text 回复文本 * @param inputType 用户输入类型。只有语音录入才会合成回复文本 * @param msgBuilder 合成信息对象 **/ private void speakAndShowResp(String text, int inputType, SpeechMsgBuilder msgBuilder) { /* 发送回复文本到聊天视图 */ EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null)); if (inputType == AssistantService.INPUT_VOICE) { msgBuilder.setText(text); SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build()) /* 合成是在Observable的subscribe()开始的,所以要在这之前通知动画播放。 * doOnSubscribe 执行在离它最近的 subscribeOn() 所指定的线程。*/ .doOnNext(new Consumer<SpeechMsg>() { @Override public void accept(SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == SpeechMsg.State.OnBegin) EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START)); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END)); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); } }
@Override public void synthesize(SpeechMsg msg, final View speakervView) { if (SynthesizerBase.isInited()) { SpeechMsgBuilder builder = new SpeechMsgBuilder(msg.text) .setOrigin(com.lingju.audio.engine.base.SpeechMsg.ORIGIN_COMMON); if (msg instanceof ResponseSetionsMsg) { SynthesizerBase.get().setForceLocalEngine(false); builder.setSections(((ResponseSetionsMsg) msg).getSetions()); } SynthesizerBase.get().startSpeakAbsolute(builder.build()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<com.lingju.audio.engine.base.SpeechMsg>() { @Override public void accept(com.lingju.audio.engine.base.SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == com.lingju.audio.engine.base.SpeechMsg.State.Idle) { Log.i("LingJu", "doOnNext>>>startSpeakerAnimation>>>" + speakervView); chatListView.startSpeakerAnimation(speakervView); } } }) .doOnComplete(new Action() { @Override public void run() throws Exception { Log.i("ChatListPresenter", "doOnComplete>>stopSpeakerAnimation"); chatListView.stopSpeakerAnimation(speakervView); } }) .subscribe(); } }
private void permissionRequest(String permissions[], final String permissionUINames[], @Nullable final Runnable onCompletes[]){ final List<Completable> completes = permissionRequest. requestPermission(this, permissions); for (int i = 0; i < completes.size(); i++) { final Completable complete = completes.get(i); final String permissionUIName = permissionUINames[i]; final Runnable onComplete = onCompletes == null ? null : onCompletes[i]; complete.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action() { @Override public void run() throws Exception { Log.d("Permission", permissionUIName); if (onCompletes != null && onComplete != null) { onComplete.run(); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d("Permission", "Permission Error: " + throwable.getLocalizedMessage()); } }); } }
@Override public void remove(Track track, Action onSuccess, Consumer<? super Throwable> onError) { personalRepository.removeTrack(track) .subscribeOn(schedulerProvider.io()) .observeOn(schedulerProvider.ui()) .subscribe(onSuccess,onError); }
@Before public void beforeEachTest() { MockitoAnnotations.initMocks(this); ErrorStateView view = new ErrorStateView() { @Override public Action showErrorState() { return show; } @Override public Action hideErrorState() { return hide; } }; assignErrorState = new AssignErrorState<>(view, uiScheduler); }
@Override public void clear(Action onSuccess, Consumer<? super Throwable> onError) { personalRepository.clearFavoritePlaylists() .subscribeOn(schedulerProvider.io()) .observeOn(schedulerProvider.ui()) .subscribe(onSuccess,onError); }
@Override public void add(Playlist playlist, Action onComplete, Consumer<? super Throwable> onError) { personalRepository.likePlaylist(playlist) .subscribeOn(schedulerProvider.io()) .observeOn(schedulerProvider.ui()) .subscribe(onComplete,onError); }
@Override public void add(Track track, Action onComplete, Consumer<? super Throwable> onError) { personalRepository.likeTrack(track) .subscribeOn(schedulerProvider.io()) .observeOn(schedulerProvider.ui()) .subscribe(onComplete,onError); }
@Nonnull public RxOptional<T> ifNotPresent(@Nonnull Action action) { if (value == null) { requireNonNull(action); try { action.run(); } catch (Exception e) { e.printStackTrace(); } } return this; }