private Row getRelatedItems() { ArrayObjectAdapter listRowAdapter = new ArrayObjectAdapter(new CardPresenter()); final Set<Long> related = mSelectedEvent.getMetadata().getRelated().keySet(); mDisposables.add(((LeanbackBaseActivity) getActivity()).getApiServiceObservable() .observeOn(AndroidSchedulers.mainThread()) .subscribe( mediaApiService -> { for (long id : related) { mDisposables.add(mediaApiService.getEvent(id) .observeOn(AndroidSchedulers.mainThread()) .subscribe(event -> listRowAdapter.add(event))); } listRowAdapter.notifyArrayItemRangeChanged(0, listRowAdapter.size()); } ) ); HeaderItem header = new HeaderItem(0, getString(R.string.related_talks)); return new ListRow(header, listRowAdapter); }
@Override protected void onResume() { super.onResume(); // Load all persons and start inserting them with 1 sec. intervals. // All RealmObject access has to be done on the same thread `findAllAsync` was called on. // Warning: This example doesn't handle back pressure well. disposable = realm.where(Person.class).findAllAsync().asFlowable() .flatMap(persons -> Flowable.fromIterable(persons)) .zipWith(Flowable.interval(1, TimeUnit.SECONDS), (person, tick) -> person) .observeOn(AndroidSchedulers.mainThread()) .subscribe(person -> { TextView personView = new TextView(AnimationActivity.this); personView.setText(person.getName()); container.addView(personView); }); }
@SuppressWarnings("MissingPermission") @RequiresPermission(anyOf = { Manifest.permission.ACCESS_COARSE_LOCATION, Manifest.permission.ACCESS_FINE_LOCATION }) public void startLocationUpdates(boolean checkLocationSettings) { stopLocationUpdates(); locationUpdatesDisposable = locationSettingsCheck(checkLocationSettings) .flatMapObservable(ignore -> locationUpdates() .startWith(lastLocation())) .map(this::transformLocation) .toFlowable(BackpressureStrategy.LATEST) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::setLocation, error -> Timber.e("Failed to get location updates", error)); }
@Override public void refreshBookHelps(BookSort sort, int start, int limited, BookDistillate distillate) { Disposable refreshDispo = RemoteRepository.getInstance() .getBookHelps(sort.getNetName(), start, limited, distillate.getNetName()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( (beans)-> { isLocalLoad = false; mView.finishRefresh(beans); mView.complete(); } , (e) ->{ mView.complete(); mView.showErrorTip(); e(e); } ); addDisposable(refreshDispo); }
public void login(String username, String password) { checkViewAttached(); getMvpView().showLoadingProgress(true); mSubscription.add(mDataManager.login(username, password, 1) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribeWith(new DisposableSingleObserver<Boolean>() { @Override public void onSuccess(Boolean success) { if (success) { loginWithCookie(); } else { getMvpView().showError("Login Failed"); } } @Override public void onError(Throwable error) { getMvpView().showError(error.getMessage()); getMvpView().showLoadingProgress(false); } })); }
private Disposable updateWatchlist(List<WatchlistItem> watchlistItems) { return ((LeanbackBaseActivity) getActivity()).getApiServiceObservable() .observeOn(AndroidSchedulers.mainThread()) .subscribe(mediaApiService -> { showWatchlist(); watchListAdapter.clear(); if(watchlistItems.size() > 0){ // int i = Math.max(0,mRowsAdapter.indexOf(mConferencesSection)); // mRowsAdapter.add(i,mRecomendationsSectionsRow); // mRowsAdapter.add(i+1,watchListAdapter); Observable.fromIterable(watchlistItems) .flatMap(watchlistItem -> mediaApiService.getEvent(watchlistItem.getEventId())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(event -> watchListAdapter.add(event)); } else { // watchListAdapter.add("Watchlist empty"); hideWatchlist(); } }); }
private void addCheckPointMarkers() { getCheckPointDataSource().getAllCheckPoints() .toObservable() .doOnNext(checkPoints -> allCheckPoints = (ArrayList<CheckPoint>) checkPoints) .flatMap(Observable::fromIterable) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<CheckPoint>() { @Override public void onNext(CheckPoint checkPoint) { getView().addMarkerOnMap(checkPoint); } @Override public void onError(Throwable e) { getView().showError(e.getLocalizedMessage()); } @Override public void onComplete() { getView().notifyListAdapter(); } }); }
public static Flowable<BaseUploadBean> generateFlowable(@NonNull RequestBodyWrapper uploadBeanEmitter, final String filePath){ Flowable<BaseUploadBean> flowable = uploadBeanEmitter.getUploadProcessor() .publish() .autoConnect(); return flowable .filter(new Predicate<BaseUploadBean>() { long oldProgress = 0; @Override public boolean test(BaseUploadBean baseUploadBean) throws Exception { if(baseUploadBean instanceof UploadInfoBean){ long progress = ((UploadInfoBean) baseUploadBean).getData().getWrittenBytes(); if(progress - oldProgress > MIN_GRAD) { oldProgress = progress; return true; } return false; } oldProgress = 0; return true; } }) //.throttleLast(100, TimeUnit.MILLISECONDS).mergeWith(flowable.takeLast(1)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
/** * Share the package data. */ @Override public void shareTo() { Disposable disposable = packagesRepository .getPackage(packageId) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Package>() { @Override public void onNext(Package value) { view.shareTo(value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); compositeDisposable.add(disposable); }
public void loadWithRetroJsoup() { final OkHttpClient okHttpClient = new OkHttpClient(); final TutosAndroidFrance tutosAndroidFrance = new RetroJsoup.Builder() .url("http://tutos-android-france.com/") .client(okHttpClient) .build() .create(TutosAndroidFrance.class); tutosAndroidFrance.articles() .toList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( adapter::addItems, Throwable::printStackTrace ); }
private void refreshTag(){ Disposable refreshDispo = RemoteRepository.getInstance() .getBookTags() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( (tagBeans)-> { refreshHorizonTag(tagBeans); refreshGroupTag(tagBeans); }, (e) ->{ LogUtils.e(e); } ); mDisposable.add(refreshDispo); }
@Override public void onRefresh() { getBinding().swipeRefreshDaily.setRefreshing(true); String baseUrl = "http://gank.io/api/"; Retrofit retrofit = new Retrofit.Builder() .baseUrl(baseUrl) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); ApiService apiService = retrofit.create(ApiService.class); apiService.getDailyData(mDate) .map(new BaseResFunc<GankDaily.Results>()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mObserver); }
@Override public void loadSongs() { mDisposables.clear(); Disposable disposable = mRepository.getAllSongs().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableSubscriber<List<Song>>() { @Override public void onError(Throwable throwable) { LogUtils.e(TAG, "onError :" + throwable.toString()); } @Override public void onComplete() { LogUtils.i(TAG, "onCompleted"); } @Override public void onNext(List<Song> songs) { onLoadSongsFinished(songs); } }); mDisposables.add(disposable); }
private void fromNetWorkLoad() { mNetWorkDisposable = ApiFactory.getGankApi() .getHomeData(mPage) .filter(gankData -> !gankData.isError()) .map(GankData::getResults) .flatMap(Flowable::fromIterable) .map(gankEntity -> new Home(gankEntity.get_id(), gankEntity.getDesc(), gankEntity.getPublishedAt(), gankEntity.getType(), gankEntity.getUrl(), gankEntity.getWho())) .buffer(60) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(homes -> { if (mPage == 1) { mList.clear(); mList.addAll(homes); saveData(homes); } else { mList.addAll(homes); mView.showData(mList); } }); }
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) { return new ObservableTransformer<T, T>() { @Override public Observable<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { view.showLoading();//显示进度条 } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() { view.hideLoading();//隐藏进度条 } }).compose(RxLifecycleUtils.bindToLifecycle(view)); } }; }
private void fetchRepositoryDetails() { ApolloCall<EntryDetailQuery.Data> entryDetailQuery = application.apolloClient() .query(new EntryDetailQuery(repoFullName)) .cacheControl(CacheControl.CACHE_FIRST); //Example call using Rx2Support disposables.add(Rx2Apollo.from(entryDetailQuery) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableSingleObserver<Response<EntryDetailQuery.Data>>() { @Override public void onSuccess(Response<EntryDetailQuery.Data> dataResponse) { setEntryData(dataResponse.data()); } @Override public void onError(Throwable e) { Log.e(TAG, e.getMessage(), e); } })); }
private void loginAgain() { if (mCredentialSource != mCredentialSink) { throw new UnsupportedOperationException("This method can only handle the case where " + "the credential source is the same as the sink, because it does not attempt " + "to save the credentials in case of a successful login."); } if (mbRequestOngoing) { return; } mbRequestOngoing = true; mApi.getConfiguration() .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .flatMap(this::getAuthReqBody) // no need to call mCredentialSink::saveCredentials here since the credentials came // from the same object anyway (source == sink as per check above) .flatMap(mApi::getAuthToken) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::handleAuthToken, this::handleLoginError); }
public void getMovieDetails(long movieId) { isMovieLoading.set(true); errorViewShowing.set(false); mMoviesRepository.getMovieDetails(movieId) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribeWith(new DisposableObserver<Movie>() { @Override public void onNext(Movie value) { setMovie(value); } @Override public void onError(Throwable e) { errorViewShowing.set(true); isMovieLoading.set(false); } @Override public void onComplete() { isMovieLoading.set(false); errorViewShowing.set(false); } }); }
@OnClick(R.id.fib_tx) void sendFibonacci() { Fibonacci fibonacci = Fibonacci.load(FIB_CONTRACT_ADDR); fibonacci.fibonacciNotify(new Uint256(11)) .compose(bindToLifecycle()) .observeOn(Schedulers.io()) .map(fibonacci::getNotifyEvents) .observeOn(AndroidSchedulers.mainThread()) .subscribe(notifyEvents -> { if (notifyEvents.isEmpty()) { return; } Log.i(TAG, "fib input : " + notifyEvents.get(0).input.getValue()); Log.i(TAG, "fib result : " + notifyEvents.get(0).result.getValue()); contractTxt.append(notifyEvents.get(0).result.getValue() + "\n"); }, Throwable::printStackTrace); }
private void initView() { subject = PublishSubject.create(); subject.debounce(0, TimeUnit.MILLISECONDS) // .filter(new Predicate<Float>() { // @Override // public boolean test(Float contrast) throws Exception { // return true; // } // }) .distinctUntilChanged() .switchMap(new Function<Float, ObservableSource<ColorMatrixColorFilter>>() { @Override public ObservableSource<ColorMatrixColorFilter> apply(Float value) throws Exception { return postContrast(value); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ColorMatrixColorFilter>() { @Override public void accept(ColorMatrixColorFilter colorMatrixColorFilter) throws Exception { setColorFilter(colorMatrixColorFilter); } }); }
@Test public void shouldBeAbleToShowReviews() { TestScheduler testScheduler = new TestScheduler(); TestObserver<ReviewWrapper> testObserver = new TestObserver<>(); Observable<ReviewWrapper> responseObservable = Observable.just(reviewWrapper) .subscribeOn(testScheduler) .observeOn(AndroidSchedulers.mainThread()); responseObservable.subscribe(testObserver); when(movieDetailsInteractor.fetchReviews(anyString())).thenReturn(responseObservable); movieDetailsPresenter.displayReviews(movie.getId()); testScheduler.triggerActions(); testObserver.assertNoErrors(); testObserver.assertComplete(); verify(view).showReviews(reviewWrapper.getReviews()); }
public void loadArtist(int id) { checkViewAttached(); getMvpView().showLoadingProgress(true); mSubscription.add(mDataManager.artistsSearch(id) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribeWith(new DisposableSingleObserver<Artist>() { @Override public void onSuccess(Artist artist) { if (artist.response.body.equals("")){ artist.response.body = "No description"; } getMvpView().showArtist(artist); getMvpView().showLoadingProgress(false); } @Override public void onError(Throwable error) { getMvpView().showError(error.getMessage()); getMvpView().showLoadingProgress(false); } })); }
@Override protected void startRefresh(HandleBase<StickyItem> refreshData) { Flowable.just(refreshData) .onBackpressureDrop() .observeOn(Schedulers.computation()) .map(new Function<HandleBase<StickyItem>, DiffUtil.DiffResult>() { @Override public DiffUtil.DiffResult apply(@NonNull HandleBase<StickyItem> handleBase) throws Exception { return handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getType(), handleBase.getRefreshType()); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<DiffUtil.DiffResult>() { @Override public void accept(@NonNull DiffUtil.DiffResult diffResult) throws Exception { handleResult(diffResult); } }); }
private void getCompanies() { Disposable disposable = companiesRepository .getCompanies() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<List<Company>>() { @Override public void onNext(List<Company> value) { view.showCompanies(value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); compositeDisposable.add(disposable); }
private void checkIfPageIsBookmarked(Integer... pages) { compositeDisposable.add(bookmarkModel.getIsBookmarkedObservable(pages) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Pair<Integer, Boolean>>() { @Override public void onNext(Pair<Integer, Boolean> result) { bookmarksCache.put(result.first, result.second); } @Override public void onError(Throwable e) { } @Override public void onComplete() { supportInvalidateOptionsMenu(); } })); }
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable { Flowable.create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(FlowableEmitter<Object> e) throws Exception { Looper.prepare(); try { joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } Looper.loop(); } } , BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); }
private Disposable disposeInUiThread(final Action action) { return Disposables.fromAction(new Action() { @Override public void run() throws Exception { if (Looper.getMainLooper() == Looper.myLooper()) { action.run(); } else { final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker(); inner.schedule(new Runnable() { @Override public void run() { try { action.run(); } catch (Exception e) { onError("Could not unregister receiver in UI Thread", e); } inner.dispose(); } }); } } }); }
public void loadArticleFromInternet(final long id, final boolean isRefresh) { mComposite.add(mService.getArticleDetailInfo(id) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<DataWrapper>() { @Override public void accept(DataWrapper article) throws Exception { cacheArticle(id, article); onArticleLoaded(article, isRefresh); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { onArticleError(throwable); } })); if (isRefresh) { ArticleCache.getCache().remove(Long.toString(id)); } }
@Override public void onResume() { super.onResume(); networkDisposable = ReactiveNetwork.observeNetworkConnectivity(getActivity()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(connectivity -> { Log.d(TAG, connectivity.toString()); final NetworkInfo.State state = connectivity.getState(); final String name = connectivity.getTypeName(); tvConnectivityStatus.setText(String.format("state: %s, typeName: %s", state, name)); }); internetDisposable = ReactiveNetwork.observeInternetConnectivity() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(isConnected -> tvInternetStatus.setText(isConnected.toString())); }
private void flowable() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { Log.e(TAG, "start send data "); for (int i = 0; i < 100; i++) { e.onNext(i); } e.onComplete(); } }, BackpressureStrategy.DROP)//指定背压策略 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber<Integer>() { @Override public void onSubscribe(@NonNull Subscription s) { //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法 //2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求 //3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。 Log.e(TAG, "onSubscribe..."); s.request(10); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext:" + integer); } @Override public void onError(Throwable t) { Log.e(TAG, "onError..." + t); } @Override public void onComplete() { Log.e(TAG, "onComplete..."); } }); }
private Disposable kickOffLoginFlow(String blogUrl) { // READ THIS: https://upday.github.io/blog/subscribe_on/ return mBlogUrlValidator .validate(blogUrl) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .doOnNext(this::setState) .flatMap(url -> mApiProvider.getGhostApi().getConfiguration()) .flatMap(config -> this.getAuthToken(mApiProvider.getGhostApi(), config)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::handleAuthToken, this::handleError); }
@OnClick(R.id.edit) void edit() { new ScriptOperations(this, mView) .importSample(mSample) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String path) throws Exception { EditActivity.editFile(ViewSampleActivity.this, path); finish(); } }); }
private void setCurrentDirectory(final ScriptFile directory, boolean canGoBack) { if (!directory.equals(mCurrentDirectory) && mOnCurrentDirectoryChangeListener != null) { mOnCurrentDirectoryChangeListener.onChange(mCurrentDirectory, directory); } mCurrentDirectory = directory; mCanGoBack = canGoBack; if (mFileProcessListener != null) { mFileProcessListener.onFilesListing(); } Observable.fromPublisher(new Publisher<ScriptFile[]>() { @Override public void subscribe(Subscriber<? super ScriptFile[]> s) { s.onNext(mStorageScriptProvider.getDirectoryScriptFiles(directory)); s.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ScriptFile[]>() { @Override public void accept(@NonNull ScriptFile[] scriptFiles) throws Exception { mAdapter.setScripts(scriptFiles); if (mFileProcessListener != null) mFileProcessListener.onFileListed(); smoothScrollToPosition(0); } }); }
public void Signin() { apiService.Signin() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ResponseBody>() { @Override public void accept(ResponseBody responseBody) throws Exception { ToastUtils.showShort("签到成功"); } }); }
public <T> void on(String eventName, Consumer<T> consumer) { Flowable<T> flowable = mRxBus.register(eventName); mProcessorMap.put(eventName, flowable); mDisposable.add(flowable.observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer, throwable -> { throwable.printStackTrace(); })); }
public void getAvatarIcon(String nickname,Observer<ResponseBody> sub ) { int size = 42; //图片大小,不过在xml里写死了 int cacheTime = 3600; //他服务器缓存的,不影响客户端 Retrofit retrofit = new Retrofit.Builder() .baseUrl(AppConfig.Avator_URL_BASE) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); retrofit.create(WorkApi.class).getAvatarIcon(nickname,size,cacheTime) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(sub); }
@Override public void bind(EpubView epubView, ViewGroup parent) { this.epubView = epubView; LayoutInflater inflater = LayoutInflater.from(parent.getContext()); binding = EpubVerticalVerticalContentBinding.inflate(inflater, parent, true); LinearLayoutManager layoutManager = new LinearLayoutManager(epubView.getContext()); layoutManager.setInitialPrefetchItemCount(2); binding.recyclerview.setLayoutManager(layoutManager); chapterAdapter = new ChapterAdapter(this, epubView); binding.recyclerview.setAdapter(chapterAdapter); binding.recyclerview.addOnScrollListener(new RecyclerView.OnScrollListener() { @Override public void onScrolled(RecyclerView recyclerView, int dx, int dy) { super.onScrolled(recyclerView, dx, dy); int firstVisibleItemPosition = layoutManager.findFirstVisibleItemPosition(); View firstVisibleView = layoutManager.findViewByPosition(firstVisibleItemPosition); scrollPosition.onNext(new Pair<>(firstVisibleItemPosition, firstVisibleView.getTop())); setCurrentChapter(firstVisibleItemPosition); } }); scrollPosition .sample(200, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(positionTopPair -> { ChapterAdapter.ChapterViewHolder holder = (ChapterAdapter.ChapterViewHolder) binding.recyclerview.findViewHolderForAdapterPosition(positionTopPair.first); if (holder != null) { float density = epubView.getContext().getResources().getDisplayMetrics().density; holder.binding.webview.callJavascriptMethod("updateFirstVisibleElementByTopPosition", -positionTopPair.second/density); } }) .subscribe(new BaseDisposableObserver<>()); }
/** * 获取相关资讯 * * @param movieId * @return */ Observable<MovieRelatedInformationBean> getMovieRelatedInformation(int movieId) { return RetrofitClient.getInstance() .apiMovieDetailService() .getMovieRelatedInformation(movieId) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
@Override public void onHandleParseHTML(final String url) { mView.showLoading(true); Observable.create(new ObservableOnSubscribe<ArrayList<ArticleItem>>() { @Override public void subscribe(ObservableEmitter<ArrayList<ArticleItem>> e) throws Exception { ArrayList<ArticleItem> list = new ArrayList<>(); Document doc = Jsoup.connect(url).get(); Elements ul = doc.getElementsByClass("list_line"); for (Element u : ul) { Elements li = u.getElementsByTag("li"); for (Element l : li) { String text = l.getElementsByTag("a").text(); String href = l.getElementsByTag("a").attr("href"); String time = l.getElementsByTag("span").text(); list.add(new ArticleItem(text, href, time)); } } e.onNext(list); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ArrayList<ArticleItem>>() { @Override public void accept(@NonNull ArrayList<ArticleItem> articleItems) throws Exception { mView.showList(articleItems); mView.showLoading(false); } }); }