/** * Adapt a {@link Sender} to rx-java's {@link Consumer} */ @SuppressWarnings("unchecked") static <M extends Message, R> Consumer<List<SendingTask<M>>> toConsumer(final Sender<M, R> sender) { return new Consumer<List<SendingTask<M>>>() { @Override public void accept(List<SendingTask<M>> tasks) throws Exception { if (tasks.isEmpty()) return; logger.debug("Sending {} messages.", tasks.size()); Object[] messageAndDeferred = SendingTask.unzipGeneric(tasks); final List<M> messages = (List<M>) messageAndDeferred[0]; final List<Deferred> deferreds = (List<Deferred>) messageAndDeferred[1]; try { List<R> result = sender.send(messages); Deferreds.resolveAll(result, deferreds); } catch (Throwable t) { Deferreds.rejectAll(MessageDroppedException.dropped(t, messages), deferreds, messages); } } }; }
private void doDeletingScriptFile() { mScriptListWithProgressBarView.showProgressBar(); Observable.fromPublisher(new Publisher<Boolean>() { @Override public void subscribe(Subscriber<? super Boolean> s) { s.onNext(PFile.deleteRecursively(mSelectedScriptFile)); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Boolean>() { @Override public void accept(@io.reactivex.annotations.NonNull Boolean deleted) throws Exception { showMessage(deleted ? R.string.text_already_delete : R.string.text_delete_failed); notifyScriptFileChanged(); onScriptFileOperated(); } }); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); RxBeacon.with(this) //.addBeaconParser(RxBeaconParser.ESTIMOTE) .beaconsInRegion() .subscribe(new Consumer<RxBeaconRange>() { @Override public void accept(@NonNull RxBeaconRange rxBeaconRange) throws Exception { Log.d("beaconsInRegion", rxBeaconRange.toString()); } }); //RxBeacon.with(this) // .addBeaconParser("m:2-3=0215,i:4-19,i:20-21,i:22-23,p:24-24") // .monitor() // .subscribe(new Consumer<RxBeaconMonitor>() { // @Override // public void accept(@NonNull RxBeaconMonitor rxBeaconMonitor) throws Exception { // Log.d("monitor", rxBeaconMonitor.toString()); // } // }); }
@Override public void onCardExhausted() { getCompositeDisposable().add(getDataManager() .getAllQuestions() .subscribeOn(getSchedulerProvider().io()) .observeOn(getSchedulerProvider().ui()) .subscribe(new Consumer<List<Question>>() { @Override public void accept(List<Question> questionList) throws Exception { if (!isViewAttached()) { return; } if (questionList != null) { getMvpView().reloadQuestionnaire(questionList); } } })); }
private void map() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "This is result " + integer; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(MainActivity.TAG, "accept: " + Thread.currentThread().getName()); info += s + "\n"; tv.setText(info); } }); }
@Override public void loadSearchHistory() { getCompositeDisposableHelper().addDisposable(getDataManager() .getSearchHistory() .compose(getCompositeDisposableHelper().<List<OrmHistory>>applySchedulers()) .subscribe(new Consumer<List<OrmHistory>>() { @Override public void accept(List<OrmHistory> histories) throws Exception { if (!histories.isEmpty()) { getMvpView().showSearchHistory(histories); } else { getMvpView().showEmptySearchHistory(); } } })); }
public void requestLoadingList() { Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() { @Override public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception { mModel = ConfigModel.getInstance(configView.getContext()); e.onNext(mModel.getConfigList()); mModel.setConfigCallback(ConfigPresenter.this); } }) .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<ConfigBean>>() { @Override public void accept(List<ConfigBean> list) throws Exception { configView.displayConfigList(list); } }); }
public void checkVersion(){ ZBHClient.getInstance() .create(ZBHServices.class, Constants.BASE_UPGRADE_URL) .getVersionInfo() .subscribeOn(Schedulers.io())//在子线程做请求处理 .observeOn(AndroidSchedulers.mainThread())//回到主线程做数据处理 .subscribe(new Consumer<VersionBean>() { @Override public void accept(VersionBean versionBean) throws Exception { //弹出提示并更新 showUpgradeDialog(versionBean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { } }); }
@Override public void loadMore(int start) { application.getRepertories() .getList(start, COUNT, null, RemoteApi.Status.RECRUITING, keywords, false) .subscribe(new Consumer<List<ProjectBean>>() { @Override public void accept(List<ProjectBean> projectBeen) throws Exception { getView().addData(projectBeen); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { getView().loadMoreError(throwable); } }); }
/** * 弹出键盘(可能存在视图尚未加载完成,需要延迟一段时间弹出) **/ public void showKeyboard() { Single.just(0) .delay(300, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .doOnSuccess(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { InputMethodManager imm = (InputMethodManager) getContext().getSystemService(Context.INPUT_METHOD_SERVICE); edit.requestFocus(); imm.showSoftInput(edit, 0); } }) .subscribeOn(Schedulers.io()) .subscribe(); }
private void put() { AvenueNetClient.builder("put") .param("name", "putname") .build() .put() .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Toast.makeText(MainActivity.this, s, Toast.LENGTH_LONG).show(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(MainActivity.this, throwable.getMessage(), Toast.LENGTH_SHORT).show(); } }); }
@OnClick(R.id.btn_take_picture) public void onBtnTakePictureClicked() { rxPermissions.request(Manifest.permission.CAMERA) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { if (aBoolean) { openCamera(); } else { ToastUtils.showShortToast("权限被拒绝"); } } }); }
public static void externalStorage(final RequestPermission requestPermission, final PickerContract.IPickerView view) { RxPermissions rxPermissions = new RxPermissions((Activity) view); boolean isPermissionsGranted = rxPermissions .isGranted(Manifest.permission.WRITE_EXTERNAL_STORAGE); if (isPermissionsGranted) { requestPermission.onRequestPermissionSuccess(); } else { rxPermissions .request(Manifest.permission.WRITE_EXTERNAL_STORAGE) .subscribe(new Consumer<Boolean>() { @Override public void accept(@NonNull Boolean aBoolean) throws Exception { if (aBoolean) { requestPermission.onRequestPermissionSuccess(); } else { view.showMessage(((Activity) view).getResources().getString(R.string.request_permission_fail)); } } }); } }
private void delayAndSwitchToState(@ViewState int state) { disposable = Flowable.just(state) .delay(3000, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer currentState) throws Exception { stateView.setState(currentState); if (currentState == ViewState.CONTENT) { displayContents(); delayAndSwitchToState(ViewState.LOADING); } else { delayAndSwitchToState(ViewState.CONTENT); } } }); }
@Test(expected = UnsupportedOperationException.class) public void onErrorAcceptWithCrashingFunctionThenThrowException() { Guard.call(new Callable<TestClass>() { @Override public TestClass call() throws Exception { throw new NullPointerException(); } }).onError(new Consumer<Exception>() { @Override public void accept(@NonNull Exception e) throws Exception { throw new UnsupportedOperationException(); } }); }
@Test public void resultThrowingInOnSuccessDeliveredToPlugin() { server.enqueue(new MockResponse()); final AtomicReference<Throwable> throwableRef = new AtomicReference<>(); RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { if (!throwableRef.compareAndSet(null, throwable)) { throw Exceptions.propagate(throwable); } } }); RecordingMaybeObserver<Result<String>> observer = subscriberRule.create(); final RuntimeException e = new RuntimeException(); service.result().subscribe(new ForwardingObserver<Result<String>>(observer) { @Override public void onSuccess(Result<String> value) { throw e; } }); assertThat(throwableRef.get()).isSameAs(e); }
public static void launchRecorder(final RequestPermission requestPermission, final PickerContract.IPickerView view) { RxPermissions rxPermissions = new RxPermissions((Activity) view); boolean isPermissionsGranted = rxPermissions .isGranted(Manifest.permission.WRITE_EXTERNAL_STORAGE) && rxPermissions .isGranted(Manifest.permission.RECORD_AUDIO); if (isPermissionsGranted) { requestPermission.onRequestPermissionSuccess(); } else { rxPermissions .request(Manifest.permission.WRITE_EXTERNAL_STORAGE , Manifest.permission.RECORD_AUDIO) .subscribe(new Consumer<Boolean>() { @Override public void accept(@NonNull Boolean aBoolean) throws Exception { if (aBoolean) { requestPermission.onRequestPermissionSuccess(); } else { view.showMessage(((Activity) view).getResources().getString(R.string.request_permission_fail)); } } }); } }
/** * 合成并显示回复文本 **/ private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) { EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null)); SynthesizerBase.get().startSpeakAbsolute(content) .doOnNext(new Consumer<SpeechMsg>() { @Override public void accept(SpeechMsg speechMsg) throws Exception { if (speechMsg.state() == SpeechMsg.State.OnBegin) EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START)); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END)); if (tracks != null) XmlyManager.get().getPlayer().playList(tracks, finalPlayIndex); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); }
public void loadRecommendedArticles(long id) { mComposite.add(mService.getRecommendedArticle(id) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Data>() { @Override public void accept(Data data) throws Exception { mBaseView.onRecommendationLoaded(filterData(data.getData())); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { mBaseView.onRecommendationError(throwable); } })); }
@Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP) { getWindow().getDecorView().setSystemUiVisibility(View.SYSTEM_UI_FLAG_LAYOUT_FULLSCREEN); getWindow().setStatusBarColor(Color.TRANSPARENT); } else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT) { getWindow().addFlags(WindowManager.LayoutParams.FLAG_TRANSLUCENT_STATUS); } // checkPermission(Manifest.permission.WRITE_EXTERNAL_STORAGE); // getWindow().setBackgroundDrawableResource(R.drawable.splash_bg); RxPermissions rxPermissions = new RxPermissions(this); rxPermissions.request(permissions) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "permission is all granted: " + aBoolean); doNext(); } }); }
private void window() { Log.e(TAG, "window: window"); Observable.interval(1, TimeUnit.SECONDS) .take(15) .window(3, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> longObservable) throws Exception { Log.e(TAG, "accept: begin..."); longObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(TAG, "accept: " + aLong); } }); } }); }
@Test public void bodyThrowingInOnCompleteDeliveredToPlugin() { server.enqueue(new MockResponse()); final AtomicReference<Throwable> throwableRef = new AtomicReference<>(); RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { if (!throwableRef.compareAndSet(null, throwable)) { throw Exceptions.propagate(throwable); } } }); RecordingObserver<String> observer = subscriberRule.create(); final RuntimeException e = new RuntimeException(); service.body().subscribe(new ForwardingObserver<String>(observer) { @Override public void onComplete() { throw e; } }); observer.assertAnyValue(); assertThat(throwableRef.get()).isSameAs(e); }
@Test public void throwingInOnCompleteDeliveredToPlugin() throws InterruptedException { server.enqueue(new MockResponse()); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Throwable> errorRef = new AtomicReference<>(); RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { if (!errorRef.compareAndSet(null, throwable)) { throw Exceptions.propagate(throwable); // Don't swallow secondary errors! } latch.countDown(); } }); TestObserver<Void> observer = new TestObserver<>(); final RuntimeException e = new RuntimeException(); service.completable().subscribe(new ForwardingCompletableObserver(observer) { @Override public void onComplete() { throw e; } }); latch.await(1, SECONDS); assertThat(errorRef.get()).isSameAs(e); }
@Test public void responseThrowingInOnSuccessDeliveredToPlugin() { server.enqueue(new MockResponse()); final AtomicReference<Throwable> throwableRef = new AtomicReference<>(); RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { if (!throwableRef.compareAndSet(null, throwable)) { throw Exceptions.propagate(throwable); } } }); RecordingSingleObserver<Response<String>> observer = subscriberRule.create(); final RuntimeException e = new RuntimeException(); service.response().subscribe(new ForwardingObserver<Response<String>>(observer) { @Override public void onSuccess(Response<String> value) { throw e; } }); assertThat(throwableRef.get()).isSameAs(e); }
public void onJokeClick(View view) { Random random = new Random(); int id = random.nextInt(11); HttpHelper.request(mSubscriptions, HttpBuilder.getAPIService().getJokeById1(id)) .subscribe(new Consumer<JokeResult>() { @Override public void accept(JokeResult jokeResult) { // 处理返回数据 mTvContent.setText(jokeResult.joke); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { // 处理错误数据 Log.e(TAG, "accept() called with: throwable = [" + throwable + "]"); } }); }
@SuppressWarnings("unchecked") @Test public void testGetStates() throws Exception { Consumer<Integer> callback = Mockito.mock(Consumer.class); Consumer<Integer> callback2 = Mockito.mock(Consumer.class); store = new RxStore<>(initialState, this::reducer); store.getStates().subscribe(callback); verify(callback, never()).accept(any()); store.dispatch(INC); verify(callback).accept(initialState + 1); store.getStates().map(i -> i * (-1)).subscribe(callback2); // we can map without changing the other subscriptions verify(callback2, never()).accept(any()); store.dispatch(INC); verify(callback).accept(initialState + 2); verify(callback2).accept((initialState + 2) * (-1)); assertEquals("subscribing through states should not change the outcome", initialState + 2, store.getState().intValue()); }
@OnClick({R.id.tv_cancel, R.id.tv_ok}) public void onClick(View view) { switch (view.getId()) { case R.id.tv_cancel: onBackPressedSupport(); break; case R.id.tv_ok: Observable.create(new ObservableOnSubscribe<Uri>() { @Override public void subscribe(ObservableEmitter<Uri> e) throws Exception { e.onNext(generateUri()); e.onComplete(); } }).compose(RxHelper.<Uri>rxSchedulerHelper()) .subscribe(new Consumer<Uri>() { @Override public void accept(Uri uri) throws Exception { RxEventHeadBean rxEventHeadBean = new RxEventHeadBean(uri); RxBus.get().send(RX_BUS_CODE_HEAD_IMAGE_URI, rxEventHeadBean); onBackPressedSupport(); } }); break; } }
/** * takeUntil与skipUntil操作符作用相反,订阅并开始发射原始Observable,它还监视你提供的第二个Observable。 * 如果第二个Observable发射了一项数据或者发射了一个终止通知( onError通知或一个onComplete通知), * TakeUntil返回的Observable会停止发射原始Observable并终止。 */ private void doSomeWork() { if (!isRunning) { Observable .interval(1, TimeUnit.SECONDS) .take(6) .takeUntil(Observable.just(10).delay(3, TimeUnit.SECONDS)) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { isRunning = true; } }) .doOnTerminate(new Action() { @Override public void run() throws Exception { isRunning = false; } }) // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); } }
@Override public void test0() { Log.i(TAG, "test0() Map simple demo, integer 1,2,3 transform to string 2,4,6"); Observable.just(1, 2, 3).map(new Function<Integer, String>() { @Override public String apply(@NonNull Integer integer) throws Exception { return Integer.toString(integer * 2); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
@Override public void getWelcomeData() { addSubscribe(mDataManager.fetchWelcomeInfo(RES) .compose(RxUtil.<WelcomeBean>rxSchedulerHelper()) .subscribe(new Consumer<WelcomeBean>() { @Override public void accept(WelcomeBean welcomeBean) { mView.showContent(welcomeBean); startCountDown(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { mView.jumpToMain(); } }) ); }
private void fetchWhitelistState() { mCompositeDisposable.add(startFetchWhitelistState() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<WhitelistState>() { @Override public void accept(WhitelistState state) throws Exception { mServerWhitelistState = state; mToggleContainer.setEnabled(true); mToggle.setEnabled(true); mToggle.setChecked(state.isEnabled()); mAdapter.updateData(state); mRefreshed = true; invalidateOptionsMenu(); onFetchSucceed(state); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(getApplicationContext(), getString(R.string.toast_something_wroing, throwable.getMessage()), Toast.LENGTH_SHORT).show(); } })); }
/** * SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。 */ private void doSomeWork() { if (!isRunning) { Observable .interval(1, TimeUnit.SECONDS) .take(6) .skipUntil(Observable.just(10).delay(3, TimeUnit.SECONDS)) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { isRunning = true; } }) .doOnTerminate(new Action() { @Override public void run() throws Exception { isRunning = false; } }) // Run on a background thread .subscribeOn(Schedulers.io()) // Be notified on the main thread .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); } }
@Test @RunTestInLooperThread public void realmResults_emittedOnUpdate() { final AtomicInteger subscriberCalled = new AtomicInteger(0); Realm realm = looperThread.getRealm(); realm.beginTransaction(); RealmResults<AllTypes> results = realm.where(AllTypes.class).findAll(); realm.commitTransaction(); subscription = results.asFlowable().subscribe(new Consumer<RealmResults<AllTypes>>() { @Override public void accept(RealmResults<AllTypes> allTypes) throws Exception { if (subscriberCalled.incrementAndGet() == 2) { looperThread.testComplete(); } } }); realm.beginTransaction(); realm.createObject(AllTypes.class); realm.commitTransaction(); }
@Test @UiThreadTest public void dynamicRealmObject_emitChangesetOnSubscribe() { DynamicRealm dynamicRealm = DynamicRealm.getInstance(realm.getConfiguration()); dynamicRealm.beginTransaction(); final DynamicRealmObject obj = dynamicRealm.createObject(AllTypes.CLASS_NAME); dynamicRealm.commitTransaction(); final AtomicBoolean subscribedNotified = new AtomicBoolean(false); subscription = obj.asChangesetObservable().subscribe(new Consumer<ObjectChange<RealmObject>>() { @Override public void accept(ObjectChange<RealmObject> change) throws Exception { assertTrue(change.getObject() == obj); assertNull(change.getChangeset()); subscribedNotified.set(true); } }); assertTrue(subscribedNotified.get()); subscription.dispose(); dynamicRealm.close(); }
@Override public void loadLatestList() { if (mIModel == null) return; mRxManager.register(mIModel.getDailyList().subscribe(new Consumer<ZhihuDailyListBean>() { @Override public void accept(ZhihuDailyListBean zhihuDailyListBean) throws Exception { mDate = zhihuDailyListBean.getDate(); //Logger.e("mDate = " + mDate); if (mIView != null) mIView.updateContentList(zhihuDailyListBean.getStories()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { if (mIView != null) { if (mIView.isVisiable()) mIView.showToast("Network error."); mIView.showNetworkError(); } } })); }
/** * 延时2s添加数据 */ private void addData(final boolean refresh) { Observable .timer(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { if (refresh) { mDatas.add(0, "Added after refresh..."); dataAdapter.setDatasAndNotify(mDatas); contentView.setLastUpdatedLabel(formatDateTime(System.currentTimeMillis())); contentView.onPullDownRefreshComplete(); } } }) .subscribe(); }
@Test @UiThreadTest public void realmList_emittedOnSubscribe() { final AtomicBoolean subscribedNotified = new AtomicBoolean(false); realm.beginTransaction(); final RealmList<Dog> list = realm.createObject(AllTypes.class).getColumnRealmList(); realm.commitTransaction(); subscription = list.asFlowable().subscribe(new Consumer<RealmList<Dog>>() { @Override @SuppressWarnings("ReferenceEquality") public void accept(RealmList<Dog> rxList) throws Exception { assertTrue(rxList == list); subscribedNotified.set(true); } }); assertTrue(subscribedNotified.get()); subscription.dispose(); }
@OnClick(R.id.edit) void edit() { new ScriptOperations(this, mView) .importSample(mSample) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String path) throws Exception { EditActivity.editFile(ViewSampleActivity.this, path); finish(); } }); }
/** * 由于服务端2分钟无交互则清空上下文对象,每2分钟上传一次导航引擎,保证导航状态正常 **/ private void keepNavigation() { cancelDisposable(); mDisposable = Observable.interval(2, 2, TimeUnit.MINUTES) .observeOn(Schedulers.io()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { uploadNavigation(); if (mNavigation == null) { throw new RuntimeException("导航结束了。。。"); } } }); }