public static <T> FlowableTransformer<T, T> applySchedules(final IView view) { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Subscription>() { @Override public void accept(Subscription subscription) throws Exception { view.showLoading(); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnTerminate(new Action() { @Override public void run() throws Exception { view.hideLoading(); } }); } }; }
public void loadRequest(int id) { checkViewAttached(); getMvpView().showLoadingProgress(true); mSubscription.add(mDataManager.getRequest(id) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribeWith(new DisposableSingleObserver<Request>() { @Override public void onSuccess(Request request) { getMvpView().showRequest(request); getMvpView().showLoadingProgress(false); } @Override public void onError(Throwable error) { getMvpView().showError(error.getMessage()); getMvpView().showLoadingProgress(false); } })); }
@MainThread @NonNull LiveData<Response<List<Country>>> getMoviesList() { if (countriesLiveData == null) { countriesLiveData = new MutableLiveData<>(); countriesRepository.getCountries() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(disposable -> loadingLiveData.setValue(true)) .doAfterTerminate(() -> loadingLiveData.setValue(false)) .subscribe( countries1 -> countriesLiveData.setValue(Response.success(countries1)), throwable -> countriesLiveData.setValue(Response.error(throwable)) ); } return countriesLiveData; }
private void restart() { mCompositeDisposable.add(FFMService.restart() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() throws Exception { findPreference("restart_webqq").setEnabled(true); } }) .subscribe(new Consumer<FFMResult>() { @Override public void accept(FFMResult ffmResult) throws Exception { Toast.makeText(getContext(), "Succeed.", Toast.LENGTH_SHORT).show(); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Toast.makeText(getContext(), "Network error:\n" + throwable.getMessage(), Toast.LENGTH_SHORT).show(); } })); }
private void choose(TrackPath source, SingleTrackAlbumBuilder result) { final Single<Boolean> dbOp = Single.fromCallable(() -> { new TrackPathNormalizer(source).removeTags(); db.addAlbum(result.build()); updateHits(result); return true; }).observeOn(Schedulers.io()); Single.fromCallable(() -> { return askForRemoval(source); }).zipWith(dbOp, (one, another) -> one && another) .subscribe(toDelete -> { if(toDelete) { Files.delete(source.getPath()); } }); }
public static void load() { RxHelper.safeObservable(Observable.fromCallable(() -> { try { InputStream stream = App.getInstance().getAssets().open(PATH); List<Emoji> emojis = EmojiLoader.loadEmojis(stream); ALL_EMOJIS = emojis; for (Emoji emoji : emojis) { for (String tag : emoji.getTags()) { if (EMOJIS_BY_TAG.get(tag) == null) { EMOJIS_BY_TAG.put(tag, new HashSet<>()); } EMOJIS_BY_TAG.get(tag).add(emoji); } for (String alias : emoji.getAliases()) { EMOJIS_BY_ALIAS.put(alias, emoji); } } EMOJI_TRIE = new EmojiTrie(emojis); stream.close(); } catch (IOException e) { e.printStackTrace(); } return ""; })).subscribeOn(Schedulers.io()).subscribe(); }
private void searchData(String tag) { mHintText.setText(""); mTagLayout.setVisibility(View.GONE); mProgressBar.setVisibility(View.VISIBLE); mSearchTag = tag; Observable<GetDataBean> observable = mSearchApi.searchTagData(mSearchTag); observable .filter(new Predicate<GetDataBean>() { @Override public boolean test(@NonNull GetDataBean getDataBean) throws Exception { return getDataBean != null; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mObserver); }
private void fetchData() { getView().showProgressBar(); getView().hideListView(); getCompositeDisposable().add(getDataProvider().getPermissionGroups(false) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableSingleObserver<ArrayList<PermissionGroupDetails>>() { @Override public void onSuccess(ArrayList<PermissionGroupDetails> permissionGroupDetails) { getView().hideProgressBar(); getView().showListView(); permissionList = permissionGroupDetails; getView().notifyListAdapter(); } @Override public void onError(Throwable e) { } })); }
private void getUserProfile() { mUserService.me() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(u -> { mUserName.setText(u.getUsername()); mAvatarView.setAvatarOfUser(u); mReputation.setText(u.getReputation()); mPost.setText(u.getPostcount()); mFollowing.setText(String.valueOf(u.getFollowingCount())); mFollower.setText(String.valueOf(u.getFollowerCount())); mLoginTime.setText(DateTimeFormatter.format(Long.parseLong(u.getLastonline()))); mRegTime.setText(DateTimeFormatter.format(Long.parseLong(u.getJoindate()))); mProfileViews.setText(u.getProfileviews()); mEmail.setText(u.getEmail()); GlideApp.with(UserProfileActivity.this) .load(NodeBBService.url(u.getCoverUrl())) .into(new SimpleTarget<Drawable>() { @Override public void onResourceReady(Drawable resource, Transition<? super Drawable> transition) { mHeaderView.setBackground(resource); } }); }); }
public void login(String username, String password) { checkViewAttached(); getMvpView().showLoadingProgress(true); mSubscription.add(mDataManager.login(username, password, 1) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribeWith(new DisposableSingleObserver<Boolean>() { @Override public void onSuccess(Boolean success) { if (success) { loginWithCookie(); } else { getMvpView().showError("Login Failed"); } } @Override public void onError(Throwable error) { getMvpView().showError(error.getMessage()); getMvpView().showLoadingProgress(false); } })); }
public Single<Boolean> login(String username, String password, int stayLoggedIn) { return mApiService.login(username, password, stayLoggedIn) .subscribeOn(Schedulers.io()) .flatMap(new Function<Response<ResponseBody>, Single<? extends Boolean>>() { @Override public Single<? extends Boolean> apply( Response<ResponseBody> responseBodyResponse) { String header = responseBodyResponse.raw().header("Set-Cookie", ""); if (header.contains("session") && !header.contains("deleted") && !header.contains("redirect")) { return Single.just(true); } else { getPreferencesHelper().clearCookies(); return Single.just(false); } } }); }
@Inject TranscodingServiceImpl(ExternalProcessService externalProcessService, TranscoderSettings transcoderSettings, MediaScanSettings mediaScanSettings, OutputParser parser) { this.externalProcessService = externalProcessService; this.transcoderSettings = transcoderSettings; this.mediaScanSettings = mediaScanSettings; this.parser = parser; this.publisher = PublishSubject.create().toSerialized(); publisher.ofType(TranscodeTask.class) .skipWhile(o -> isActive()) .observeOn(Schedulers.computation()) .subscribeOn(Schedulers.io()) .subscribe(this::prepareTranscode); }
@Override public void loadBookList(BookListType type, String tag, int start, int limited) { Disposable refreshDispo = getBookListSingle(type, tag, start, limited) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( (beans)-> { mView.finishLoading(beans); } , (e) ->{ mView.showLoadError(); LogUtils.e(e); } ); addDisposable(refreshDispo); }
public static Single<WriteFileEvent> writeFile(final String content, final Path path, final boolean overwrite) { Preconditions.checkNotNull(content); Preconditions.checkNotNull(path); return Single.fromCallable(() -> { if (path.getParent() != null && !Files.exists(path.getParent())) { Files.createDirectories(path.getParent()); } if (overwrite) { Files.deleteIfExists(path); } else if (Files.isDirectory(path)) { throw new IOException("There is already a directory at " + path); } else if (Files.exists(path)) { throw new IOException("There is already a file at " + path); } final ByteSink sink = MoreFiles.asByteSink(path); sink.write(content.getBytes()); return WriteFileEvent.of(path); }).subscribeOn(Schedulers.io()); }
@Override public void fetchMoreData() { if (TextUtils.isEmpty(getNextPageUrl())) return; view.showLoadingMore(true); repository.listShotLikesForUserOfNextPage(getNextPageUrl()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose(((LifecycleProvider<FragmentEvent>) view).bindUntilEvent(FragmentEvent.DESTROY_VIEW)) .subscribe(listResponse -> { view.showLoadingMore(false); view.showMoreData(generateEpoxyModels(listResponse.body())); setNextPageUrl(new PageLinks(listResponse).getNext()); }, throwable -> { view.showLoadingMore(false); view.showSnackbar(throwable.getMessage()); throwable.printStackTrace(); }); }
private void registerMainThreadEvents() { Disposable disposable = RxBus.getInstance() .toObservable() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (o instanceof OcrStatusChangedEvent) { mScanView.handleOrcStatusChangedEvent(((OcrStatusChangedEvent) o)); } else if (o instanceof NewDetectionFoundEvent) { mScanView.handleNewDetectionFound(((NewDetectionFoundEvent) o).getDetection()); } else if (o instanceof SearchResultReadyEvent) { SearchResultReadyEvent event = (SearchResultReadyEvent) o; mScanView.showPreviewResults(event.getSearchResultContainer(), event.getQuery()); } } }); mCompositeDisposable.add(disposable); }
private void addCheckPointMarkers() { getCheckPointDataSource().getAllCheckPoints() .toObservable() .doOnNext(checkPoints -> allCheckPoints = (ArrayList<CheckPoint>) checkPoints) .flatMap(Observable::fromIterable) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<CheckPoint>() { @Override public void onNext(CheckPoint checkPoint) { getView().addMarkerOnMap(checkPoint); } @Override public void onError(Throwable e) { getView().showError(e.getLocalizedMessage()); } @Override public void onComplete() { getView().notifyListAdapter(); } }); }
public void requestLoadingList() { Observable.create(new ObservableOnSubscribe<List<ConfigBean>>() { @Override public void subscribe(ObservableEmitter<List<ConfigBean>> e) throws Exception { mModel = ConfigModel.getInstance(configView.getContext()); e.onNext(mModel.getConfigList()); mModel.setConfigCallback(ConfigPresenter.this); } }) .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<ConfigBean>>() { @Override public void accept(List<ConfigBean> list) throws Exception { configView.displayConfigList(list); } }); }
public void refreshToken(AuthToken token) { if (mbRequestOngoing) { return; } mbRequestOngoing = true; mApi.getConfiguration() .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .map(ConfigurationList::getClientSecret) .map(clientSecret -> new RefreshReqBody(token.getRefreshToken(), clientSecret)) .flatMap(mApi::refreshAuthToken) // since this token was just refreshed, it doesn't have a refresh token, so add that .doOnNext(authToken -> authToken.setRefreshToken(token.getRefreshToken())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::handleAuthToken, this::handleRefreshError); }
@Override public void refreshBookReview(BookSort sort, BookType bookType, int start, int limited, BookDistillate distillate) { Disposable refreshDispo = RemoteRepository.getInstance() .getBookReviews(sort.getNetName(), bookType.getNetName(), start, limited, distillate.getNetName()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( (beans)-> { isLocalLoad = false; mView.finishRefresh(beans); mView.complete(); } , (e) ->{ mView.complete(); mView.showErrorTip(); e(e); } ); addDisposable(refreshDispo); }
public WorkWebsocket(Backend backend) { backend.getWorkObservable(workResponses).subscribe((req) -> { String msg = GSON.toJson(req); int offset = 0; for (Session session : sessions) { final int finalOffset = ++offset; Schedulers.io().scheduleDirect(()-> { try { JsonObject finalMsg = GSON.fromJson(msg, JsonObject.class); finalMsg.addProperty("offset", finalOffset); session.getRemote().sendString(GSON.toJson(finalMsg)); System.out.println("Send message to a session"); } catch (IOException e) { e.printStackTrace(); } }); } }); }
public void handle(RoutingContext event) { LOG.debug("Received request on URI [{}]", event.request().absoluteURI()); RequestContextAccessor.set(requestContextFactory.fromRoutingContext(event)); authenticate(); Single.just(event) .observeOn(Schedulers.io()) .flatMap((s_event) -> singleDelegate.apply(s_event)) .subscribeOn(Schedulers.io()) // Write HTTP response on IO thread .subscribe( (result) -> responseWriter.buildSuccess(event, result), (e) -> event.fail(e) ); RequestContextAccessor.remove(); }
public static Flowable<BaseUploadBean> generateFlowable(@NonNull RequestBodyWrapper uploadBeanEmitter, final String filePath){ Flowable<BaseUploadBean> flowable = uploadBeanEmitter.getUploadProcessor() .publish() .autoConnect(); return flowable .filter(new Predicate<BaseUploadBean>() { long oldProgress = 0; @Override public boolean test(BaseUploadBean baseUploadBean) throws Exception { if(baseUploadBean instanceof UploadInfoBean){ long progress = ((UploadInfoBean) baseUploadBean).getData().getWrittenBytes(); if(progress - oldProgress > MIN_GRAD) { oldProgress = progress; return true; } return false; } oldProgress = 0; return true; } }) //.throttleLast(100, TimeUnit.MILLISECONDS).mergeWith(flowable.takeLast(1)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
@Override public void onCreate() { super.onCreate(); Disposable disposable = RxBus.getDefault() .toObservable(String.class) .subscribeOn(Schedulers.io()) .doOnNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { wsServer.broadcast(s); } }) .subscribe(); compositeDisposable.add(disposable); mAssetManager = getAssets(); }
@Override public void onCreate() { super.onCreate(); Picasso picasso = new Picasso.Builder(this) .loggingEnabled(BuildConfig.DEBUG) .indicatorsEnabled(false) .build(); Picasso.setSingletonInstance(picasso); RxBus.getInstance() .toObservable() .subscribeOn(Schedulers.newThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("EVENT", "onNewEvent: " + o.getClass().getSimpleName() + " [" + o.toString() + "]"); } }); }
/** * 初始化区号 **/ private void initZipCode() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { Set<Map.Entry<String, String>> entries = ZipCodeMap.MAP.entrySet(); List<Zipcode> zipCodeList = new ArrayList<>(); for (Map.Entry<String, String> entry : entries) { Zipcode zipCode = new Zipcode(); zipCode.setCity(entry.getKey()); zipCode.setCode(entry.getValue()); zipCodeList.add(zipCode); } CallAndSmsDao.getInstance(StartUpActivity.this).insertZipCodes(zipCodeList); } }) .delay(1500, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribeOn(Schedulers.io()) .subscribe(); }
private void callAgain(final String title, final String artist) { mDisposable.clear(); mDisposable.add(loadLyrics.downloadLrcFile(title, artist, MusicPlayerRemote.getSongDurationMillis()) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .doOnSubscribe(disposable -> { mRefresh.startAnimation(rotateAnimation); }) .subscribe(this::showLyricsLocal, throwable -> { mRefresh.clearAnimation(); showLyricsLocal(null); loadLyricsWIki(title, artist); hideLyrics(View.GONE); }, () -> { mRefresh.clearAnimation(); Toast.makeText(this, "Lyrics downloaded", Toast.LENGTH_SHORT).show(); })); }
@Test public void shouldReturnErrorWhenSubscribeOnNonLooperThread() { //GIVEN final Observable<Intent> observable = RxBroadcastReceivers.fromIntentFilter(application, testIntentFilter) .subscribeOn(Schedulers.newThread()); //WHEN final TestObserver<Intent> observer = observable.test(); //THEN observer.awaitTerminalEvent(); observer.assertTerminated(); }
@Override protected void onActivityResult(int requestCode, int resultCode, Intent data) { if (resultCode != RESULT_OK) { return; } String packageName = data.getStringExtra(ShortcutIconSelectActivity.EXTRA_PACKAGE_NAME); if (packageName != null) { try { mIcon.setImageDrawable(getPackageManager().getApplicationIcon(packageName)); mIsDefaultIcon = false; } catch (PackageManager.NameNotFoundException e) { e.printStackTrace(); } return; } if (data.getData() == null) return; Observable.fromCallable(() -> BitmapFactory.decodeStream(getContentResolver().openInputStream(data.getData()))) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((bitmap -> { mIcon.setImageBitmap(bitmap); mIsDefaultIcon = false; }), error -> { Log.e(LOG_TAG, "decode stream", error); }); }
private void showImage() { Observable.just(inputUrl) .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String url) throws Exception { return getBitmap(url); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe(Disposable d) { showLoading(); } @Override public void onNext(Bitmap bitmap) { saturationView.setImageBitmap(bitmap); } @Override public void onError(Throwable e) { } @Override public void onComplete() { hideLoading(); } }); }
/** * 获取相关资讯 * * @param movieId * @return */ Observable<MovieRelatedInformationBean> getMovieRelatedInformation(int movieId) { return RetrofitClient.getInstance() .apiMovieDetailService() .getMovieRelatedInformation(movieId) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
/** * Start this upload asynchronously. Returns progress updates. * * @return {@link Flowable} that emits {@link Progress} events */ public Flowable<Progress<FileLink>> run() { Flowable<Prog<FileLink>> startFlow = Flowable .fromCallable(new UploadStartFunc(this)) .subscribeOn(Schedulers.io()); // Create multiple func instances to each upload a subrange of parts from the file // Merge each of these together into one so they're executed concurrently Flowable<Prog<FileLink>> transferFlow = Flowable.empty(); for (int i = 0; i < CONCURRENCY; i++) { UploadTransferFunc func = new UploadTransferFunc(this); Flowable<Prog<FileLink>> temp = Flowable .create(func, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()); transferFlow = transferFlow.mergeWith(temp); } Flowable<Prog<FileLink>> completeFlow = Flowable .fromCallable(new UploadCompleteFunc(this)) .subscribeOn(Schedulers.io()); return startFlow .concatWith(transferFlow) .concatWith(completeFlow) .buffer(PROG_INTERVAL_SEC, TimeUnit.SECONDS) .flatMap(new ProgMapFunc(this)); }
public static void deleteFileOnDisk(Iterable<MusicBean> list) { Flowable.fromIterable(list).flatMap(new Function<MusicBean, Publisher<File>>() { @Override public Publisher<File> apply(MusicBean bean) throws Exception { return Flowable.just(new File(bean.getPath())); } }).observeOn(Schedulers.io()) .subscribeOn(Schedulers.io()) .subscribe(new Subscriber<File>() { @Override public void onSubscribe(Subscription s) { s.request(Integer.MAX_VALUE); } @Override public void onNext(File file) { file.delete(); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } }); }
@Override public void putDataToCache(final Image image) { Observable.create(new ObservableOnSubscribe<Image>() { @Override public void subscribe(ObservableEmitter<Image> e) throws Exception { putDataToDiskLruCache(image); } }).subscribeOn(Schedulers.io()).subscribe(); }
private void initDemoRX02() { Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); Log.e(TAG, "emit 1"); emitter.onNext(1); } }); Consumer<Integer> consumer = new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName()); Log.e(TAG, "onNext: " + integer); } }; // observable.subscribe(consumer); /* * 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效. 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次. * */ observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer); }
@Override public final <T> void addSubscribe(io.reactivex.Observable<T> observable, Observer<T> observer) { if (null == observable || null == observer) { return; } observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); }
/** * 获取长评数据 * * @param movieId * @return */ Observable<MovieLongCommentBean> getMovieLongComment(int movieId) { return RetrofitClient.getInstance() .apiMovieDetailService() .getMovieLongComment(movieId) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
public static void main(String[] args) { Flowable.interval(1, TimeUnit.MILLISECONDS) .onBackpressureBuffer(10, () -> System.out.println("overflow!"), BackpressureOverflowStrategy.DROP_LATEST) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println(i); }); sleep(5000); }
@Override public void stopRing() { if (isDelay) { /*mAlarm.setRtime(new SimpleDate().toValue() + 5); mAssistDao.updateAlarm(mAlarm); //开启闹钟服务 Intent rIntent = new Intent(mContext, RemindService.class); rIntent.putExtra(RemindService.CMD, (RemindService.ALARM << 4) + RemindService.ADD); rIntent.putExtra(RemindService.ID, mAlarm.getId()); mContext.startService(rIntent);*/ Intent delayIntent = new Intent(mContext, AssistantService.class); delayIntent.putExtra(AssistantService.CMD, AssistantService.ServiceCmd.DELAY_ALARM); delayIntent.putExtra(RemindService.ID, mAlarm.getId()); mContext.startService(delayIntent); //合成提示语音 StringBuilder builder = new StringBuilder(); SimpleDate sd = new SimpleDate(); sd.setValue(sd.toValue() + 5); builder.append(sd.toString()) .append("将再次响铃"); SynthesizerBase.get().startSpeakAbsolute(builder.toString()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); } else if (mAlarm.getFrequency() == 0) { mAlarm.setValid(0); mAssistDao.updateAlarm(mAlarm); } player.stop(); player.release(); if (timer != null) { timer.cancel(); timer = null; } }
public static void main(String[] args) { Observable<Integer> source = Observable.create(emitter -> { for (int i = 0; i <= 1000; i++) { if (emitter.isDisposed()) return; emitter.onNext(i); } emitter.onComplete(); }); source.observeOn(Schedulers.io()) .subscribe(System.out::println); sleep(1000); }