Java 类io.reactivex.disposables.Disposable 实例源码
项目:RxLifecycle
文件:MainActivity.java
private void disposeOnResume() {
Disposable d = Observable
.interval(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<Long>() {
@Override
public void onNext(Long value) {
Log.d(TAG, "Timer A:" + value.toString() + " Seconds");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
/*
* Dispose observer on ActivityEvent.RESUME
*/
dispose(d, ActivityEvent.RESUME);
}
项目:chaosflix
文件:ConferencesBrowseFragment.java
private Disposable updateWatchlist(List<WatchlistItem> watchlistItems) {
return ((LeanbackBaseActivity) getActivity()).getApiServiceObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mediaApiService -> {
showWatchlist();
watchListAdapter.clear();
if(watchlistItems.size() > 0){
// int i = Math.max(0,mRowsAdapter.indexOf(mConferencesSection));
// mRowsAdapter.add(i,mRecomendationsSectionsRow);
// mRowsAdapter.add(i+1,watchListAdapter);
Observable.fromIterable(watchlistItems)
.flatMap(watchlistItem -> mediaApiService.getEvent(watchlistItem.getEventId()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(event -> watchListAdapter.add(event));
} else {
// watchListAdapter.add("Watchlist empty");
hideWatchlist();
}
});
}
项目:grpc-mate
文件:RxStreamObserverTest.java
@Test
public void clientStreaming_error_break_flow() throws Exception {
PublishSubject<Integer> publishSubject = PublishSubject.create();
Set<Integer> resultsHolder = Sets.newConcurrentHashSet();
Set<Throwable> exceptionsHolder = Sets.newConcurrentHashSet();
AtomicBoolean complete = new AtomicBoolean(false);
Disposable disposable = publishSubject
.doOnNext(num->resultsHolder.add(num))
.doOnError(t->exceptionsHolder.add(t))
.doOnComplete(()->complete.compareAndSet(false,true))
.subscribe();
assertThat(disposable.isDisposed()).isFalse();
RxStreamObserver<Integer> rxStreamObserver = new RxStreamObserver<>(publishSubject);
rxStreamObserver.onError(new IllegalStateException());
assertThat(disposable.isDisposed()).isTrue();
assertThat(resultsHolder).isEmpty();
assertThat(exceptionsHolder.size()).isEqualTo(1);
assertThat(exceptionsHolder.iterator().next()).isInstanceOf(IllegalStateException.class);
assertThat(complete).isFalse();
}
项目:android-contact-extractor
文件:ContactInfoFragment.java
private Disposable readAndFillContacts(final int filterType) {
CQuery cQuery = CQuery.getInstance(getActivity());
cQuery.filter(filterType);
return cQuery.build(new IContact() {
@Override
public void onContactSuccess(List<CList> mList) {
//Toast.makeText(getActivity(), " Contacts count " + mList.size(), Toast.LENGTH_SHORT).show();
if (mList != null && !mList.isEmpty()) {
for (CList cList : mList) {
setUpContactList(filterType, cList);
}
mList.clear();
}
updateList();
}
@Override
public void onContactError(Throwable throwable) {
Toast.makeText(getActivity(), "" + throwable.getLocalizedMessage(), Toast.LENGTH_SHORT).show();
}
});
}
项目:Reactive-Android-Programming
文件:MainActivity.java
private void demo1() {
final Disposable subscribe = Observable.create(emitter -> {
emitter.setCancellable(() -> {
log("setCancellable");
helloText.setOnClickListener(null);
});
helloText.setOnClickListener(v -> {
log("listener", "Click");
emitter.onNext(v);
});
})
.doOnDispose(() -> log("onDispose"))
.doOnComplete(() -> log("doOnComplete"))
.subscribe(e -> log("subscribe", "Click"));
subscribe.dispose();
}
项目:RxEasyHttp
文件:RxUtil.java
public static <T> ObservableTransformer<T, T> io_main() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
HttpLog.i("+++doFinally+++");
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}
项目:ZhaZhaShop
文件:MovieMoreRankPresenter.java
@Override
public void getOverseaComingMovieList(String area, int limit, int offset) {
mManager.getOverseaComingMovie(area, limit, offset)
.subscribe(new Observer<OverseaComingMovieBean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
mView.showLoading();
}
@Override
public void onNext(@NonNull OverseaComingMovieBean data) {
mView.addOverseaComingMovieList(data.getData().getComing());
}
@Override
public void onError(@NonNull Throwable e) {
mView.showError(ErrorHanding.handleError(e));
}
@Override
public void onComplete() {
mView.showContent();
}
});
}
项目:Rxjava2.0Demo
文件:DoActivity.java
private void processRepeat() {
Observable.just(1, 2, 3)
.repeat(3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
项目:Aurora
文件:VideoDetailPresenter.java
public void getSecondRelaRelateVideoInfo(String path, int id, int startnum) {
mModel.getSecondRelateVideoInfo(path, id, startnum).compose(RxUtils.applySchedulersWithLifeCycle(mRootView))
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
if (startnum==0){
mRootView.showLoading();
}
}
})
.subscribe(new ErrorHandleSubscriber<VideoListInfo>(mErrorHandler) {
@Override
public void onNext(VideoListInfo info) {
mRootView.setData(info, true);
}
});
}
项目:RxLifecycle
文件:MainActivity.java
private void disposeOnDestroy() {
Disposable d = Observable
.interval(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<Long>() {
@Override
public void onNext(Long value) {
Log.d(TAG, "Timer B:" + value.toString() + " Seconds");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
/*
* Dispose observer
*/
dispose(d);
}
项目:AmenEye
文件:AboutMePresenter.java
public void loadInfo(String user) {
ApiManager.getInstence().getGithubService()
.getMyInfo(user).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<UserInfo>() {
@Override
public void onSubscribe(Disposable d) {
addDisposable(d);
}
@Override
public void onNext(UserInfo value) {
mIAboutMeActivity.showMyInfo(value);
}
@Override
public void onError(Throwable e) {
mIAboutMeActivity.loadMyInfoFail();
}
@Override
public void onComplete() {
}
});
}
项目:Rxjava2.0Demo
文件:CreateActivity.java
private void from() {
String[] items = new String[]{"1", "2", "3"};
final Disposable[] mDisposable = new Disposable[1];
Observable.fromArray(items)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mDisposable[0] = disposable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
if (s.equals("2")) {
mDisposable[0].dispose();
}
Log.e(TAG, "accept: " + s);
}
});
}
项目:NeiHanDuanZiTV
文件:RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxUtils.bindToLifecycle(view));
}
};
}
项目:GitHub
文件:HandlerSchedulerTest.java
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void directSchedulePeriodicallyDisposedDoesNotRun() {
CountingRunnable counter = new CountingRunnable();
Disposable disposable = scheduler.schedulePeriodicallyDirect(counter, 1, 1, MINUTES);
runUiThreadTasks();
assertEquals(0, counter.get());
idleMainLooper(1, MINUTES);
runUiThreadTasks();
assertEquals(1, counter.get());
idleMainLooper(1, MINUTES);
runUiThreadTasks();
assertEquals(2, counter.get());
disposable.dispose();
idleMainLooper(1, MINUTES);
runUiThreadTasks();
assertEquals(2, counter.get());
}
项目:GitHub
文件:HandlerSchedulerTest.java
@Test @Ignore("Implementation delegated to default RxJava implementation")
public void workerSchedulePeriodicallyDisposedDoesNotRun() {
Worker worker = scheduler.createWorker();
CountingRunnable counter = new CountingRunnable();
Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES);
runUiThreadTasks();
assertEquals(0, counter.get());
idleMainLooper(1, MINUTES);
runUiThreadTasks();
assertEquals(1, counter.get());
idleMainLooper(1, MINUTES);
runUiThreadTasks();
assertEquals(2, counter.get());
disposable.dispose();
idleMainLooper(1, MINUTES);
runUiThreadTasks();
assertEquals(2, counter.get());
}
项目:Rxjava2.0Demo
文件:SplashActivity.java
private void start() {
Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Long>>() {
@Override
public ObservableSource<? extends Long> apply(Throwable throwable) throws Exception {
return null;
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(MainActivity.TAG, "accept: " + aLong);
startActivity(new Intent(SplashActivity.this, MainActivity.class));
finish();
}
});
dLists.add(disposable);
}
项目:NovelReader
文件:CommentDetailPresenter.java
@Override
public void loadComment(String detailId, int start, int limit) {
Disposable loadDispo = RemoteRepository.getInstance()
.getDetailComments(detailId, start, limit)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
(bean) -> {
mView.finishLoad(bean);
},
(e) -> {
mView.showLoadError();
LogUtils.e(e);
}
);
addDisposable(loadDispo);
}
项目:vertx-kubernetes-workshop
文件:Helpers.java
/**
* Utility method to report the completion/failure from a Single to a Future.
*
* @param future the future
* @return the single observer to pass to {@link Single#subscribe()}
*/
public static SingleObserver<JsonObject> toObserver(Future<JsonObject> future) {
return new SingleObserver<JsonObject>() {
public void onSubscribe(@NonNull Disposable d) {
}
public void onSuccess(@NonNull JsonObject item) {
future.tryComplete(item);
}
public void onError(Throwable error) {
future.tryFail(error);
}
};
}
项目:Learning-RxJava
文件:Ch6_20.java
public static void main(String[] args) {
Disposable d = Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(() -> System.out.println("Disposing on thread"
+ Thread.currentThread().getName()))
.unsubscribeOn(Schedulers.io())
.subscribe(i -> System.out.println("Received " +
i));
sleep(3000);
d.dispose();
sleep(3000);
}
项目:ConductorMVP
文件:TideDetailsController.java
@Override protected void attachView(final TideDetailsView view) {
super.attachView(view);
DemoApplication.app(getApplicationContext()).injector().inject(this);
view.setTitle(tideLocationName);
noaaApi.getTideInfo(noaaApiId)
.observeOn(AndroidSchedulers.mainThread())
.compose(this.<TideInfo>bindToLifecycle())
.subscribe(new Observer<TideInfo>() {
@Override public void onSubscribe(@NonNull Disposable d) {
}
@Override public void onNext(@NonNull TideInfo tideInfo) {
if (tideInfo.getData() != null && !tideInfo.getData().isEmpty()) {
List<Observation> observations = tideInfo.getData();
BigDecimal highestMeasuredTideHeight =
Collections.max(filterOutNullMeasurements(observations), OBSERVATION_COMPARATOR)
.getVerifiedWaterLevel();
BigDecimal lowestMeasuredTideHeight =
Collections.min(filterOutNullMeasurements(observations), OBSERVATION_COMPARATOR)
.getVerifiedWaterLevel();
BigDecimal latestMeasuredTideHeight =
observations.get(observations.size() - 1).getVerifiedWaterLevel();
view.setTideHeights(latestMeasuredTideHeight, lowestMeasuredTideHeight,
highestMeasuredTideHeight);
}
}
@Override public void onError(@NonNull Throwable e) {
view.showError();
}
@Override public void onComplete() {
}
});
}
项目:Learning-RxJava
文件:Ch2_33.java
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);
Disposable disposable =
seconds.subscribe(l -> System.out.println("Received: " + l));
//sleep 5 seconds
sleep(5000);
//dispose and stop emissions
disposable.dispose();
//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);
}
项目:RxJava2-Android-Sample
文件:SkipExampleActivity.java
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
项目:NovelReader
文件:BookDetailPresenter.java
@Override
public void addToBookShelf(CollBookBean collBookBean) {
Disposable disposable = RemoteRepository.getInstance()
.getBookChapters(collBookBean.get_id())
.subscribeOn(Schedulers.io())
.doOnSubscribe(
(d) -> mView.waitToBookShelf() //等待加载
)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
beans -> {
//设置 id
for(BookChapterBean bean :beans){
bean.setId(MD5Utils.strToMd5By16(bean.getLink()));
}
//设置目录
collBookBean.setBookChapters(beans);
//存储收藏
BookRepository.getInstance()
.saveCollBookWithAsync(collBookBean);
mView.succeedToBookShelf();
}
,
e -> {
mView.errorToBookShelf();
LogUtils.e(e);
}
);
addDisposable(disposable);
}
项目:GifEmoji
文件:EmoticonDrawable.java
void animation() {
if (delay > 0 && frameNum > 0)
Observable
.interval(delay, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
return Observable.just(aLong);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long drawable) {
position++;
if (position >= frameNum) position = 0;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
项目:RxJava2-Android-Sample
文件:TimestampExampleActivity.java
private Observer<Timed<String>> getObserver() {
return new Observer<Timed<String>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Timed<String> value) {
textView.append(" onNext : value : " + value.value() + ", time:" + value.time());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
项目:Demos
文件:MainActivity.java
/**
* Rx方式使用
*/
private void rxRequest() {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(Constant.SERVER_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.client(RetrofitUtils.getOkHttpClient()) // 打印请求参数
.build();
RetrofitService service = retrofit.create(RetrofitService.class);
Observable<PostInfo> observable = service.getPostInfoRx("yuantong", "11111111111");
observable.subscribeOn(Schedulers.io()) // 在子线程中进行Http访问
.observeOn(AndroidSchedulers.mainThread()) // UI线程处理返回接口
.subscribe(new Observer<PostInfo>() { // 订阅
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull PostInfo postInfo) {
Log.i("http返回:", postInfo.toString());
Toast.makeText(MainActivity.this, postInfo.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
项目:MovingGdufe-Android
文件:MeFragment.java
private void queryCurrentCash(){
cardFactory.getCurrentCash(new Observer<CardBasic>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(CardBasic value) {
if(null != value && !TextUtils.isEmpty(value.getCash())) {
tvMeCash.setText("¥" + value.getCash());
mCardNum = value.getCardNum();
}else{
tvMeCash.setText("获取失败");
}
}
@Override
public void onError(Throwable e) {
if(e != null && !TextUtils.isEmpty(e.getMessage())) {
LogUtils.e(e.toString());
Toast.makeText(getActivity(), e.getMessage(), Toast.LENGTH_SHORT).show();
}
tvMeCash.setText("获取失败");
}
@Override
public void onComplete() {
}
});
}
项目:DeepImagePreview-Project
文件:TabActivity.java
private void registerWithEventBusOnMainThread() {
Disposable disposable = RxBus.getInstance().toObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
}
});
mCompositeDisposable.add(disposable);
}
项目:GmArchMvvm
文件:RtoViewModel.java
LiveData<List<TextContent>> getSearchDataBySort(int start, int end, int search, int sort) {
if (mContents == null)
mContents = new MutableLiveData<>();
if (mData == null)
mData = new ArrayList<>();
mModel.getSortedData(start, end, search, sort)
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new ErrorHandleSubscriber<List<TextContent>>(
RepositoryUtils.INSTANCE.obtainRepositoryComponent(getApplication()).rxErrorHandler()) {
@Override
public void onSubscribe(Disposable d) {
super.onSubscribe(d);
addDispose(d);
}
@Override
public void onNext(List<TextContent> textContents) {
// mContents.setValue(textContents);
mContents.postValue(textContents);
}
@Override
public void onError(Throwable e) {
super.onError(e);
}
}
);
return mContents;
}
项目:RxDisposal
文件:SubscriptionDecoratorTest.java
private void validateOnSubscribe(Observer<? super String> testObserver) throws Exception {
final Disposable testDisposable = mock(Disposable.class);
testObserver.onSubscribe(testDisposable);
verify(mockOnSusbcribe, times(1))
.accept(argThat(validateDisposeIsDelegated(testDisposable)));
}
项目:RxSWT
文件:EclipseScheduler.java
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null)
throw new NullPointerException("run == null");
if (unit == null)
throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(run);
executeRunnable(jobName, delay, unit, scheduled);
return scheduled;
}
项目:KomaMusic
文件:RecentlyPlayPresenter.java
@Override
public void loadRecentlyPlayedSongs() {
if (mDisposables != null) {
mDisposables.clear();
}
Disposable disposable = mRepository.getRecentlyPlayedSongs().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSubscriber<List<Song>>() {
@Override
public void onError(Throwable e) {
LogUtils.e(TAG, "loadRecentlyPlayedSongs error : " + e.toString());
}
@Override
public void onComplete() {
}
@Override
public void onNext(List<Song> songs) {
onLoadPlayedSongsFinished(songs);
}
});
mDisposables.add(disposable);
}
项目:SlotNSlot_Android
文件:RxSlotRoom.java
private void setBankerSeedInitializedEvent() {
Disposable disposable = machine
.bankerSeedInitializedEventObservable()
.subscribe(response -> {
Log.i(TAG, "banker seed initialized.");
Log.i(TAG, "banker seed1 : " + Utils.byteToHex(response._bankerSeed.getValue().get(0).getValue()));
Log.i(TAG, "banker seed2 : " + Utils.byteToHex(response._bankerSeed.getValue().get(1).getValue()));
Log.i(TAG, "banker seed3 : " + Utils.byteToHex(response._bankerSeed.getValue().get(2).getValue()));
}, Throwable::printStackTrace);
compositeDisposable.add(disposable);
}
项目:DeepImagePreview-Project
文件:ScanPresenter.java
private void registerEvents() {
Disposable disposable = RxBus.getInstance()
.toObservable()
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
if (o instanceof PageChangedEvent) {
handlePageChangedEvent((PageChangedEvent) o);
}
}
});
mCompositeDisposable.add(disposable);
}
项目:AssistantBySDK
文件:MemoActivity.java
private void loadData() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
List<Memo> memos = mAssistDao.findAllMemoDesc(false);
for (Memo memo : memos) {
mDatas.add(new TaskCard<>(memo, TaskCard.TaskState.ACTIVE));
}
memoCount = memos.size();
e.onNext(0);
}
})
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mCpbLoad.setVisibility(View.VISIBLE);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
mCpbLoad.setVisibility(View.GONE);
mMemoAdapter.notifyDataSetChanged();
}
});
}
项目:RxJava2-Android-Sample
文件:NetworkingActivity.java
private void findUsersWhoLovesBoth() {
// here we are using zip operator to combine both request
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
List<User> userWhoLovesBoth =
filterUserWhoLovesBoth(cricketFans, footballFans);
return userWhoLovesBoth;
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<User> users) {
// do anything with user who loves both
Log.d(TAG, "userList size : " + users.size());
for (User user : users) {
Log.d(TAG, "user : " + user.toString());
}
}
@Override
public void onError(Throwable e) {
Utils.logError(TAG, e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
项目:GitHub
文件:RxTiPresenterDisposableHandlerTest.java
@Test
public void testManageViewDisposable_ShouldReturnSameDisposable() throws Exception {
mPresenter.create();
mPresenter.attachView(mView);
final TestObserver<Integer> testObserver = new TestObserver<>();
final Disposable disposable = mDisposableHandler.manageViewDisposable(testObserver);
assertThat(testObserver, is(equalTo(disposable)));
}
项目:KomaMusic
文件:AlbumsPresenter.java
@Override
public void loadAlbums() {
LogUtils.i(TAG, "loadAlbums");
mDisposables.clear();
Disposable disposable = mRepository.getAllAlbums().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSubscriber<List<Album>>() {
@Override
public void onError(Throwable throwable) {
LogUtils.e(TAG, "loadAlbums onError : " + throwable.toString());
}
@Override
public void onComplete() {
}
@Override
public void onNext(List<Album> albumList) {
onLoadSongsFinished(albumList);
}
});
mDisposables.add(disposable);
}
项目:GitHub
文件:PaginationActivity.java
/**
* subscribing for data
*/
private void subscribeForData() {
Disposable disposable = paginator
.onBackpressureDrop()
.concatMap(new Function<Integer, Publisher<List<String>>>() {
@Override
public Publisher<List<String>> apply(@NonNull Integer page) throws Exception {
loading = true;
progressBar.setVisibility(View.VISIBLE);
return dataFromNetwork(page);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> items) throws Exception {
paginationAdapter.addItems(items);
paginationAdapter.notifyDataSetChanged();
loading = false;
progressBar.setVisibility(View.INVISIBLE);
}
});
compositeDisposable.add(disposable);
paginator.onNext(pageNumber);
}
项目:rxgwt-tips
文件:Pokeapi.java
static Disposable showPokemons() {
return fromPromise(Notification.requestPermission()).filter("granted"::equals).toFlowable()
.concatMap(n -> pokePaging("https://pokeapi.co/api/v2/pokemon/?limit=5"), 1)
.zipWith(interval(5, 30, TimeUnit.SECONDS), (url, tick) -> url, false, 1)
.flatMapSingle(n -> fetchJson(n.url, Pokemon.class))
.subscribe(n -> {
NotificationOptions options = Js.uncheckedCast(JsPropertyMap.of());
options.icon = n.sprites.front_default;
options.body = "Do you know that " + n.name + " weight is " + n.weight + ".";
new Notification(n.name, options);
});
}