Java 类io.reactivex.functions.Action 实例源码
项目:Rx_java2_soussidev
文件:LollipopNetworkObservingStrategy.java
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
final String service = Context.CONNECTIVITY_SERVICE;
final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
return Observable.create(new ObservableOnSubscribe<Connectivity>() {
@Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
networkCallback = createNetworkCallback(subscriber, context);
final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
manager.registerNetworkCallback(networkRequest, networkCallback);
}
}).doOnDispose(new Action() {
@Override public void run() {
tryToUnregisterCallback(manager);
}
}).startWith(Connectivity.create(context)).distinctUntilChanged();
}
项目:Rx_java2_soussidev
文件:MarshmallowNetworkObservingStrategy.java
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
final String service = Context.CONNECTIVITY_SERVICE;
final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
networkCallback = createNetworkCallback(context);
registerIdleReceiver(context);
final NetworkRequest request =
new NetworkRequest.Builder().addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.addCapability(NetworkCapabilities.NET_CAPABILITY_NOT_RESTRICTED)
.build();
manager.registerNetworkCallback(request, networkCallback);
return connectivitySubject.toFlowable(BackpressureStrategy.LATEST).doOnCancel(new Action() {
@Override public void run() {
tryToUnregisterCallback(manager);
tryToUnregisterReceiver(context);
}
}).startWith(Connectivity.create(context)).distinctUntilChanged().toObservable();
}
项目:GitHub
文件:RxJavaTests.java
@Test
@UiThreadTest
public void realmList_closeInDoOnUnsubscribe() {
realm.beginTransaction();
RealmList<Dog> list = realm.createObject(AllTypes.class).getColumnRealmList();
realm.commitTransaction();
Flowable<RealmList<Dog>> observable = list.asFlowable().doOnCancel(new Action() {
@Override
public void run() throws Exception {
realm.close();
}
});
subscription = observable.subscribe(new Consumer<RealmList<Dog>>() {
@Override
public void accept(RealmList<Dog> ignored) throws Exception {
}
});
subscription.dispose();
assertTrue(realm.isClosed());
}
项目:GitHub
文件:RxJavaTests.java
@Test
@UiThreadTest
public void dynamicRealmResults_closeInDoOnUnsubscribe() {
final DynamicRealm dynamicRealm = DynamicRealm.getInstance(realm.getConfiguration());
Flowable<RealmResults<DynamicRealmObject>> flowable = dynamicRealm.where(AllTypes.CLASS_NAME).findAll().asFlowable()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
dynamicRealm.close();
}
});
subscription = flowable.subscribe(new Consumer<RealmResults<DynamicRealmObject>>() {
@Override
public void accept(RealmResults<DynamicRealmObject> ignored) throws Exception {
}
});
subscription.dispose();
assertTrue(dynamicRealm.isClosed());
}
项目:GitHub
文件:RxJavaTests.java
@Test
@UiThreadTest
public void realmObject_closeInDoOnUnsubscribe() {
realm.beginTransaction();
realm.createObject(AllTypes.class);
realm.commitTransaction();
Flowable<AllTypes> flowable = realm.where(AllTypes.class).findFirst().<AllTypes>asFlowable()
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
realm.close();
}
});
subscription = flowable.subscribe(new Consumer<AllTypes>() {
@Override
public void accept(AllTypes ignored) throws Exception {
}
});
subscription.dispose();
assertTrue(realm.isClosed());
}
项目:FCM-for-Mojo
文件:ServerSettingsFragment.java
private void restart() {
mCompositeDisposable.add(FFMService.restart()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() throws Exception {
findPreference("restart_webqq").setEnabled(true);
}
})
.subscribe(new Consumer<FFMResult>() {
@Override
public void accept(FFMResult ffmResult) throws Exception {
Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show();
}
}));
}
项目:FCM-for-Mojo
文件:ServerSettingsFragment.java
private void stop() {
mCompositeDisposable.add(FFMService.stop()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() throws Exception {
findPreference("stop_webqq").setEnabled(true);
}
})
.subscribe(new Consumer<FFMResult>() {
@Override
public void accept(FFMResult ffmResult) throws Exception {
Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show();
}
}));
}
项目:reactive-architectures-playground
文件:ToogleRefreshTests.java
@Before public void beforeEachTest() {
MockitoAnnotations.initMocks(this);
ToogleRefreshView view = new ToogleRefreshView() {
@Override public Action disableRefresh() {
return disableRefresh;
}
@Override public Action enableRefresh() {
return enableRefresh;
}
};
toogler = new RefreshToogle<>(view, uiScheduler);
}
项目:AssistantBySDK
文件:TingPlayProcessor.java
/**
* 合成并显示回复文本
**/
private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) {
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null));
SynthesizerBase.get().startSpeakAbsolute(content)
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
if (tracks != null)
XmlyManager.get().getPlayer().playList(tracks, finalPlayIndex);
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
项目:AssistantBySDK
文件:NaviProcessor.java
/**
* 合成文本并提前返回
**/
private void speakAndAheadReturn(String text, SpeechMsgBuilder msgBuilder) {
/* 将回复文本发送到聊天列表 */
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null));
/* 合成回复文本 */
msgBuilder.setText(text).setForceLocalEngine(true);
SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new NavigateEvent(NavigateEvent.START_NAVI));
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
项目:RxJava2-Android-Sample
文件:TakeUntilExampleActivity.java
/**
* takeUntil与skipUntil操作符作用相反,订阅并开始发射原始Observable,它还监视你提供的第二个Observable。
* 如果第二个Observable发射了一项数据或者发射了一个终止通知( onError通知或一个onComplete通知),
* TakeUntil返回的Observable会停止发射原始Observable并终止。
*/
private void doSomeWork() {
if (!isRunning) {
Observable
.interval(1, TimeUnit.SECONDS)
.take(6)
.takeUntil(Observable.just(10).delay(3, TimeUnit.SECONDS))
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
isRunning = true;
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
isRunning = false;
}
})
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
}
项目:RxLifeCycle
文件:MainActivity.java
@Override
protected void onStart() {
super.onStart();
Log.d(TAG, "onStart()");
// Using automatic unsubscription, this should determine that the correct time to
// unsubscribe is onStop (the opposite of onStart).
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
}
});
}
项目:AssistantBySDK
文件:VoiceMediator.java
private void speak(SpeechMsgBuilder msgBuilder, final boolean isAnim) {
SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin && isAnim)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
项目:AssistantBySDK
文件:ChatAlbumListAdapter.java
/**
* 合成并显示回复文本
**/
private void synthesizeAndShowResp(final List<Track> tracks, String content, final int finalPlayIndex) {
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(content), null, null, null));
IflySynthesizer.get().startSpeakAbsolute(content)
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
if (tracks != null)
XmPlayerManager.getInstance(mContext).playList(tracks, finalPlayIndex);
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
项目:AssistantBySDK
文件:AssistPresenter.java
/**
* 发送回复文本文本视图并合成声音
**/
private void showAndSpeak(SpeechMsgBuilder builder) {
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(builder.getText()), null, null, null));
IflySynthesizer.getInstance().startSpeakAbsolute(builder.build())
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
项目:MVPArmsTest1
文件:RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxUtils.bindToLifecycle(view));
}
};
}
项目:RxJava4AndroidDemos
文件:FlatMap.java
@Override
public void test0() {
Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9");
Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
return Observable.just(integer * 2 + "", integer * 3 + "");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Action run() for onComplete()");
}
});
}
项目:RxJava4AndroidDemos
文件:Repeat.java
@Override
public void test0() {
Log.i(TAG, "test0() Range simple demo, repeat is 2");
Observable.just("1", "2", "3").repeat(2).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Action run() for onComplete()");
}
});
}
项目:RxPaper2
文件:RxPaperBook.java
/**
* Saves most types of POJOs or collections in {@link Book} storage.
* <p/>
* To deserialize correctly it is recommended to have an all-args constructor, but other types
* may be available.
*
* @param key object key is used as part of object's file name
* @param value object to save, must have no-arg constructor, can't be null.
* @return this Book instance
*/
public <T> Completable write(final String key, final T value) {
return Completable.fromAction(new Action() {
@Override
public void run() {
book.write(key, value);
}
})
// FIXME in RxJava1 the error would be propagated to updates.
// In RxJava2 the error happens on the Completable this method returns.
// This andThen block reproduces the behavior in RxJava1.
.andThen(Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
try {
updates.onNext(Pair.create(key, value));
} catch (Throwable t) {
updates.onError(t);
}
}
})).subscribeOn(scheduler);
}
项目:RxEasyHttp
文件:RxUtil.java
public static <T> ObservableTransformer<ApiResult<T>, T> _io_main() {
return new ObservableTransformer<ApiResult<T>, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new HandleFuc<T>())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.onErrorResumeNext(new HttpResponseFunc<T>());
}
};
}
项目:RxEasyHttp
文件:RxUtil.java
public static <T> ObservableTransformer<ApiResult<T>, T> _main() {
return new ObservableTransformer<ApiResult<T>, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) {
return upstream
//.observeOn(AndroidSchedulers.mainThread())
.map(new HandleFuc<T>())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.onErrorResumeNext(new HttpResponseFunc<T>());
}
};
}
项目:ZhaZhaShop
文件:HotMovieListPresenter.java
@Override
public void getHotMovieList(int limit) {
mView.showLoading();
mHotMovieListManager.getHotMovieList(limit)
.subscribe(new Consumer<HotMovieBean>() {
@Override
public void accept(@NonNull HotMovieBean hotMovieBean) throws Exception {
mView.addHotMovieList(hotMovieBean.getData().getHot());
mView.addMovieIds(hotMovieBean.getData().getMovieIds());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: " + throwable.getLocalizedMessage().toString());
mView.showError(throwable.getMessage().toString());
}
}, new Action() {
@Override
public void run() throws Exception {
mView.showContent();
}
});
}
项目:ReactiveAirplaneMode
文件:ReactiveAirplaneMode.java
/**
* Disposes an action in UI Thread
*
* @param dispose action to be executed
* @return Disposable object
*/
private Disposable disposeInUiThread(final Action dispose) {
return Disposables.fromAction(new Action() {
@Override public void run() throws Exception {
if (Looper.getMainLooper() == Looper.myLooper()) {
dispose.run();
} else {
final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
inner.schedule(new Runnable() {
@Override public void run() {
try {
dispose.run();
} catch (Exception exception) {
onError("Could not unregister receiver in UI Thread", exception);
}
inner.dispose();
}
});
}
}
});
}
项目:rxtools
文件:SubjectMapTest.java
@Test
public void testErrorHandlingInValueProvider()
{
// setup
final AtomicBoolean missHandlerCalled = new AtomicBoolean(false);
TestSubscriber<Integer> testSubscriber1 = new TestSubscriber<>();
subscribe(source.get("hello"), testSubscriber1);
source.onNext("hello", new Callable<Integer>() {
@Override
public Integer call() throws Exception
{
throw new RuntimeException("Boom");
}
}, new Action() {
@Override
public void run()
{
missHandlerCalled.set(true);
}
});
testSubscriber1.assertError(RuntimeException.class);
}
项目:RxJava4AndroidDemos
文件:Map.java
@Override
public void test0() {
Log.i(TAG, "test0() Map simple demo, integer 1,2,3 transform to string 2,4,6");
Observable.just(1, 2, 3).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return Integer.toString(integer * 2);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Consumer<String> accept() s: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Action run() for onComplete()");
}
});
}
项目:EazyBaseMVP
文件:RxUtils.java
public static <T> FlowableTransformer<T, T> applySchedules(final IView view) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
view.showLoading();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
view.hideLoading();
}
});
}
};
}
项目:RxJava4AndroidDemos
文件:Buffer.java
@Override
public void test0() {
Log.i(TAG, "test0() Buffer simple demo, 1,2,3 buffer(2)");
Observable.just("1", "2", "3").buffer(2).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
Log.d(TAG, "Consumer<String> accept() strings: " + strings);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Action run() for onComplete()");
}
});
}
项目:Learning-RxJava
文件:Ch9_7.java
public static <T> ObservableOperator<T, T> doOnEmpty(Action
action) {
return new ObservableOperator<T, T>() {
@Override
public Observer<? super T> apply(Observer<? super T>
observer) throws Exception {
return new DisposableObserver<T>() {
boolean isEmpty = true;
@Override
public void onNext(T value) {
isEmpty = false;
observer.onNext(value);
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
if (isEmpty) {
try {
action.run();
} catch (Exception e) {
onError(e);
return;
}
}
observer.onComplete();
}
};
}
};
}
项目:GitHub
文件:DownloadViewHolder.java
private void delete() {
dispose(data.disposable);
mRxDownload.deleteServiceDownload(data.record.getUrl(), true)
.doFinally(new Action() {
@Override
public void run() throws Exception {
mAdapter.remove(getAdapterPosition());
}
})
.subscribe();
}
项目:FCM-for-Mojo
文件:NotificationSettingsFragment.java
private void uploadNotificationsToggle(final Preference preference) {
final NotificationToggle newNotificationToggle = NotificationToggle.create(mFriendToggle.isChecked(), mGroupToggle.isChecked());
if (newNotificationToggle.equals(mServerNotificationToggle)) {
return;
}
preference.setEnabled(false);
mCompositeDisposable.add(FFMService
.updateNotificationsToggle(NotificationToggle.create(mFriendToggle.isChecked(), mGroupToggle.isChecked()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() throws Exception {
preference.setEnabled(true);
}
})
.subscribe(new Consumer<FFMResult>() {
@Override
public void accept(FFMResult result) throws Exception {
mServerNotificationToggle = newNotificationToggle;
//Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show();
Log.d("Sync", "updateNotificationsToggle success, new state: " + newNotificationToggle);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show();
Log.w("Sync", "updateNotificationsToggle failed", throwable);
}
}));
}
项目:KotlinKomparisons
文件:PersistedLoginDetailsRepository.java
@Override
public @NonNull Completable updateLoginMethod(@NonNull final LoginMethod method) {
checkNotNull(method);
return persistence.write(LOGIN_METHOD, method)
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
methodRelay.accept(method);
}
});
}
项目:AssistantBySDK
文件:MobileCommProcessor.java
/**
* 合成并在聊天视图中显示回复文本
*
* @param text 回复文本
* @param inputType 用户输入类型。只有语音录入才会合成回复文本
* @param msgBuilder 合成信息对象
**/
private void speakAndShowResp(String text, int inputType, SpeechMsgBuilder msgBuilder) {
/* 发送回复文本到聊天视图 */
EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null));
if (inputType == AssistantService.INPUT_VOICE) {
msgBuilder.setText(text);
SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
/* 合成是在Observable的subscribe()开始的,所以要在这之前通知动画播放。
* doOnSubscribe 执行在离它最近的 subscribeOn() 所指定的线程。*/
.doOnNext(new Consumer<SpeechMsg>() {
@Override
public void accept(SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == SpeechMsg.State.OnBegin)
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe();
}
}
项目:AssistantBySDK
文件:ChatListPresenter.java
@Override
public void synthesize(SpeechMsg msg, final View speakervView) {
if (SynthesizerBase.isInited()) {
SpeechMsgBuilder builder = new SpeechMsgBuilder(msg.text)
.setOrigin(com.lingju.audio.engine.base.SpeechMsg.ORIGIN_COMMON);
if (msg instanceof ResponseSetionsMsg) {
SynthesizerBase.get().setForceLocalEngine(false);
builder.setSections(((ResponseSetionsMsg) msg).getSetions());
}
SynthesizerBase.get().startSpeakAbsolute(builder.build())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<com.lingju.audio.engine.base.SpeechMsg>() {
@Override
public void accept(com.lingju.audio.engine.base.SpeechMsg speechMsg) throws Exception {
if (speechMsg.state() == com.lingju.audio.engine.base.SpeechMsg.State.Idle) {
Log.i("LingJu", "doOnNext>>>startSpeakerAnimation>>>" + speakervView);
chatListView.startSpeakerAnimation(speakervView);
}
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.i("ChatListPresenter", "doOnComplete>>stopSpeakerAnimation");
chatListView.stopSpeakerAnimation(speakervView);
}
})
.subscribe();
}
}
项目:webtrekk-android-sdk
文件:MainActivity.java
private void permissionRequest(String permissions[], final String permissionUINames[], @Nullable final Runnable onCompletes[]){
final List<Completable> completes = permissionRequest.
requestPermission(this, permissions);
for (int i = 0; i < completes.size(); i++) {
final Completable complete = completes.get(i);
final String permissionUIName = permissionUINames[i];
final Runnable onComplete = onCompletes == null ? null : onCompletes[i];
complete.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action() {
@Override
public void run() throws Exception {
Log.d("Permission", permissionUIName);
if (onCompletes != null && onComplete != null) {
onComplete.run();
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("Permission", "Permission Error: " + throwable.getLocalizedMessage());
}
});
}
}
项目:Melophile
文件:HistoryTrackInteractor.java
@Override
public void remove(Track track, Action onSuccess, Consumer<? super Throwable> onError) {
personalRepository.removeTrack(track)
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribe(onSuccess,onError);
}
项目:reactive-architectures-playground
文件:AssignErrorStateTests.java
@Before public void beforeEachTest() {
MockitoAnnotations.initMocks(this);
ErrorStateView view = new ErrorStateView() {
@Override public Action showErrorState() {
return show;
}
@Override public Action hideErrorState() {
return hide;
}
};
assignErrorState = new AssignErrorState<>(view, uiScheduler);
}
项目:Melophile
文件:FavoritePlaylistInteractor.java
@Override
public void clear(Action onSuccess, Consumer<? super Throwable> onError) {
personalRepository.clearFavoritePlaylists()
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribe(onSuccess,onError);
}
项目:Melophile
文件:FavoritePlaylistInteractor.java
@Override
public void add(Playlist playlist, Action onComplete, Consumer<? super Throwable> onError) {
personalRepository.likePlaylist(playlist)
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribe(onComplete,onError);
}
项目:Melophile
文件:FavoriteTrackInteractor.java
@Override
public void add(Track track, Action onComplete, Consumer<? super Throwable> onError) {
personalRepository.likeTrack(track)
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.ui())
.subscribe(onComplete,onError);
}
项目:RxOptional
文件:RxOptional.java
@Nonnull
public RxOptional<T> ifNotPresent(@Nonnull Action action) {
if (value == null) {
requireNonNull(action);
try {
action.run();
} catch (Exception e) {
e.printStackTrace();
}
}
return this;
}