@Override protected void startRefresh(HandleBase<StickyItem> refreshData) { Flowable.just(refreshData) .onBackpressureDrop() .observeOn(Schedulers.computation()) .map(new Function<HandleBase<StickyItem>, DiffUtil.DiffResult>() { @Override public DiffUtil.DiffResult apply(@NonNull HandleBase<StickyItem> handleBase) throws Exception { return handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getType(), handleBase.getRefreshType()); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<DiffUtil.DiffResult>() { @Override public void accept(@NonNull DiffUtil.DiffResult diffResult) throws Exception { handleResult(diffResult); } }); }
@Override public void test0() { Log.i(TAG, "test0() Map simple demo, integer 1,2,3 transform to string 2,4,6"); Observable.just(1, 2, 3).map(new Function<Integer, String>() { @Override public String apply(@NonNull Integer integer) throws Exception { return Integer.toString(integer * 2); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
private void updateEvenStatus(int status, BIncidentRemark note) { showProgress(); wrap(ApiHelper.getInstance().updateEventStatus(incidentDto.getId(), status, note)).subscribe( new BaseObserver<IncidentDto>(EditInfoActivity.this) { @Override public void onNext(@NonNull IncidentDto responseBody) { hideProgress(); super.onNext(responseBody); ToastUtils.showShortSafe("success"); RxBus.getInstance().send(new UpdateEvent(true)); finish(); } @Override public void onError(@NonNull Throwable e) { hideProgress(); super.onError(e); ToastUtils.showShortSafe("failure"); pbClaim.setVisibility(View.GONE); sl.setVisibility(View.VISIBLE); } }); }
private void getResultActivity() { final Intent intent = new Intent(getActivity(), Result.class); RxActivityResultCompact.startActivityForResult(this, intent, REQUEST_CODE) .subscribe(new Consumer<ActivityResult>() { @Override public void accept(@NonNull ActivityResult result) throws Exception { if (result.isOk()) { final String txt = result.getData().getStringExtra(Result.GET_TEXT); textresult.setText(txt); } } }); }
private Observable<VersionInfo> checkUpdateIfUsingWifi(Context context) { if (!NetworkUtils.isWifiAvailable(context)) { return Observable.empty(); } Observable<VersionInfo> observable = checkForUpdates(); observable.subscribe(new SimpleObserver<VersionInfo>() { @Override public void onNext(@NonNull VersionInfo versionInfo) { if (versionInfo.isValid()) { setVersionInfo(versionInfo); } } @Override public void onError(@NonNull Throwable e) { e.printStackTrace(); } }); return observable; }
private void testRx() { Observable.just("tony") .subscribe(new Consumer<String>() { @HookMethod(beforeMethod = "testRxBefore") @Override public void accept(@NonNull String s) throws Exception { System.out.println("s="+s); } private void testRxBefore() { L.i("testRxBefore() is called before accept()"); } }); }
public void getSearchUserResultFromServer(String value){ OvRetrofit.getInstance().async(OvRetrofit.getInstance().getService().getSearchUserList(value), new OvObserver<HttpResult<List<SearchUserBean>>>() { @Override protected void setNeedContext() { setContext(mContext); } @Override protected void setProgress(boolean isShow) { super.setProgress(false); } @Override protected void _onNext(@NonNull HttpResult<List<SearchUserBean>> listHttpResult) { mPresenter.showSearchUserList(listHttpResult.getData()); } @Override protected void _onError(@NonNull Throwable e) { mPresenter.showNoSearchResult("该用户不存在"); } }); }
public void takePartInPatyToServer(String partyId,String uid){ OvRetrofit.getInstance().async(OvRetrofit.getInstance().getService().takePartInTheParty(partyId, uid), new OvObserver<HttpResult<String>>() { @Override protected void setNeedContext() { setContext(mContext); } @Override protected void setProgress(boolean isShow) { super.setProgress(false); } @Override protected void _onNext(@NonNull HttpResult<String> stringHttpResult) { mPresenter.showTakePartPartyResult(stringHttpResult.getMsg()); } @Override protected void _onError(@NonNull Throwable e) { } }); }
@Override public void test0() { Log.i(TAG, "test0() FlatMap simple demo, integer 1,2,3 transform to string 2,3,4,6,6,9"); Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { return Observable.just(integer * 2 + "", integer * 3 + ""); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Consumer<String> accept() s: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Consumer<Throwable> accept() throwable: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Action run() for onComplete()"); } }); }
@Test public void debugWhileChainConfigIsNotDebuggingThenDoNotInvokeDebug() { InternalConfiguration config = InternalConfiguration .getInstance("debugWhileChainConfigIsNotDebuggingThenDoNotInvokeDebug"); config.setDebugging(false); final boolean[] result = {false}; new Chain<>(new TestClass(), config) .debug(new Consumer<TestClass>() { @Override public void accept(@NonNull TestClass testClass) throws Exception { result[0] = true; } }); assertFalse(result[0]); }
@Test public void reduceWithMultipleItemsThenReturnFunctionResult() { boolean result = new Collector<Boolean>(configuration) .and(true) .and(false) .and(true) .reduce(new BiFunction<Boolean, Boolean, Boolean>() { @Override public Boolean apply(@NonNull Boolean itemOne, @NonNull Boolean itemTwo) { return itemOne.equals(itemTwo); } }) .call(); assertFalse(result); }
public void addConcernToServer(String userId,String userById){ OvRetrofit.getInstance().async(OvRetrofit.getInstance().getService().addConcern(userId, userById), new OvObserver<HttpResult<String>>() { @Override protected void setNeedContext() { setContext(mContext); } @Override protected void setProgress(boolean isShow) { super.setProgress(false); } @Override protected void _onNext(@NonNull HttpResult<String> stringHttpResult) { if (stringHttpResult.getStatus()==1){ mPresenter.showAddConcern(stringHttpResult.getData()); } } @Override protected void _onError(@NonNull Throwable e) { } }); }
private <TAction extends Action> void executeMiddleware(@NonNull final TAction action) { if (mMiddlewareList.size() > 0) { synchronized (mMiddlewareSyncRoot) { Observable.fromIterable(mMiddlewareList) .flatMap(new Function<Middleware, ObservableSource<? extends Action>>() { @Override public ObservableSource<? extends Action> apply(@NonNull Middleware middleware) throws Exception { return middleware.process(action); } }) .subscribe(new Consumer<Action>() { @Override public void accept(@NonNull Action resultAction) throws Exception { dispatch(resultAction); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { onMiddlewareError(throwable); } }); } } }
private void loadData() { if (!NetworkUtils.avaliable()) { showNetworkError(); return; } Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); // imitate expensive computation return "Done"; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.i("LeftFragment", "data received"); showEmptyData(); } }); }
/** * 获取更多数据 */ public void loadMorePostLists(){ ++mMoreNum; OvRetrofit.getInstance().async(OvRetrofit.getInstance().getService().getNewPostList(uid,String.valueOf(mMoreNum), "10"), new OvObserver<HttpResult<List<PostBean>>>() { @Override protected void setNeedContext() { setContext(mPresenter.getContext()); } @Override protected void _onNext(@NonNull HttpResult<List<PostBean>> httpResult) { if (httpResult.getData()!=null && httpResult.getData().size()>0){ mPresenter.loadMorePostsSuccess(httpResult.getData()); }else { mPresenter.loadMorePostNothing(); } } @Override protected void _onError(@NonNull Throwable e) { mPresenter.loadMorePostFailue(); } }); }
public void loadData(int idx, boolean refresh, boolean clearCache) { mModel.getRecommendIndexData(idx, refresh, clearCache) .retryWhen(new RetryWithDelay(3, 2)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(disposable -> { if (refresh || clearCache) mRootView.showLoading(); }) .doFinally(() -> mRootView.hideLoading()) .observeOn(Schedulers.io()) .map(indexData -> mModel.parseIndexData(indexData)) .observeOn(AndroidSchedulers.mainThread()) .compose(RxLifecycleUtils.bindToLifecycle(mRootView)) .subscribe(new ErrorHandleSubscriber<List<RecommendMultiItem>>(mErrorHandler) { @Override public void onNext(@NonNull List<RecommendMultiItem> recommendMultiItems) { if (recommendMultiItems != null) { setAdapter(recommendMultiItems, refresh); } } }); }
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) { return new ObservableTransformer<T, T>() { @Override public Observable<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { view.showLoading();//显示进度条 } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() { view.hideLoading();//隐藏进度条 } }).compose(RxLifecycleUtils.bindToLifecycle(view)); } }; }
@Test public void onErrorAcceptForCrashingGuardThenInvokeTheFunction() { final Exception[] result = {null}; Guard.call(new Callable<TestClass>() { @Override public TestClass call() throws Exception { throw new UnsupportedOperationException(); } }).onError(new Consumer<Exception>() { @Override public void accept(@NonNull Exception e) throws Exception { result[0] = e; } }); assertEquals(UnsupportedOperationException.class, result[0].getClass()); }
/** * Filter, which returns true if at least one given state occurred * * @param states NetworkInfo.State, which can have one or more states * @return true if at least one given state occurred */ public static Predicate<Connectivity> hasState(final NetworkInfo.State... states) { return new Predicate<Connectivity>() { @Override public boolean test(@NonNull Connectivity connectivity) throws Exception { for (NetworkInfo.State state : states) { if (connectivity.getState() == state) { return true; } } return false; } }; }
@NonNull public static Observable<ArrayList<Song>> getPlaylistSongList(@NonNull Context context, final int playlistId) { return Observable.create(e -> { ArrayList<PlaylistSong> songs = new ArrayList<>(); Cursor cursor = makePlaylistSongCursor(context, playlistId); if (cursor != null && cursor.moveToFirst()) { do { songs.add(getPlaylistSongFromCursorImpl(cursor, playlistId)); } while (cursor.moveToNext()); } if (cursor != null) { cursor.close(); } e.onNext((ArrayList<Song>) (List) songs); e.onComplete(); }); }
public void show() { mProgress.show(); VersionService.getInstance() .checkForUpdates() .observeOn(AndroidSchedulers.mainThread()) .subscribe(new SimpleObserver<VersionInfo>() { @Override public void onNext(@NonNull VersionInfo versionInfo) { mProgress.dismiss(); if (versionInfo.isNewer()) { new UpdateInfoDialogBuilder(mContext, versionInfo) .show(); } else { Toast.makeText(App.getApp(), R.string.text_is_latest_version, Toast.LENGTH_SHORT).show(); } } @Override public void onError(@NonNull Throwable e) { e.printStackTrace(); mProgress.dismiss(); Toast.makeText(App.getApp(), R.string.text_check_update_error, Toast.LENGTH_SHORT).show(); } }); }
public void getGankioData(GankioType type, int count, int page, boolean isProgress) { if (isProgress) { view.showProgress(""); } wrap(gankioRepository.getAllGankioData(type, count, page)).flatMap( new Function<BaseResult<GankioData>, ObservableSource<BaseResult<GankioData>>>() { @Override public ObservableSource<BaseResult<GankioData>> apply( @NonNull BaseResult<GankioData> gankioDataBaseResult) throws Exception { //List<GankioData> results = ; for (GankioData gankioData : gankioDataBaseResult.getResults()) { boolean b = dbRepository.queryBrowseHistory(gankioData.get_id()); gankioData.setBrowseHistory(b); } return Observable.just(gankioDataBaseResult); } }).subscribe(new ViewObserver<GankioData>(view) { @Override protected void onSuccess(List<GankioData> t) { view.display(t); } @Override public void onError(@NonNull Throwable e) { super.onError(e); view.displayError(); } }); }
public void trainNewModel() { Register register = new Register(repository, threadExecutor.getOriginScheduler(), threadExecutor.getPostScheduler()); view.onRegisterStart(); register.execute(new DisposableObserver<Integer>() { @Override public void onNext(@NonNull Integer modelNumber) { view.onRegisterDone(); train(modelNumber); } @Override public void onError(@NonNull Throwable e) { view.onRegisterDone(); train(-1); } @Override public void onComplete() { } }); }
/** * Extract StackTrace and filter to show an app-specific entry at its top * * @param exception RxJavaAssemblyException to be parsed * @return StackTrace, filtered so a app-specific line is at the top of it */ @NonNull static StackTraceElement[] parseStackTrace(@NonNull RxJavaAssemblyException exception, @Nullable String[] basePackages) { String[] lines = exception.stacktrace() .split(NEW_LINE_REGEX); List<StackTraceElement> stackTrace = new ArrayList<StackTraceElement>(); boolean filterIn = false; for (String line : lines) { filterIn = filterIn || basePackages == null || basePackages.length == 0 || startsWithAny(line, basePackages); if (filterIn) { StackTraceElement element = parseStackTraceLine(line); if (element != null) { stackTrace.add(element); } } } return stackTrace.toArray(new StackTraceElement[0]); }
@Override public void onResume() { super.onResume(); disposable.add(adapter.onItemEvent() .map(new Function<ObservableAdapter.ViewEvent, Object>() { @Override public Object apply(@NonNull ObservableAdapter.ViewEvent viewEvent) throws Exception { return viewEvent.getData(); } }) .ofType(Admin.class) .subscribe(new Consumer<Admin>() { @Override public void accept(@NonNull Admin data) throws Exception { Toast.makeText(getActivity(), "Clicked " + data.getName(), Toast.LENGTH_SHORT).show(); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Timber.e(throwable, "Error watching adapter"); } })); }
public static Cursor makePlaylistSongCursor(@NonNull final Context context, final int playlistId) { try { return context.getContentResolver().query( MediaStore.Audio.Playlists.Members.getContentUri("external", playlistId), new String[]{ MediaStore.Audio.Playlists.Members.AUDIO_ID,// 0 AudioColumns.TITLE,// 1 AudioColumns.TRACK,// 2 AudioColumns.YEAR,// 3 AudioColumns.DURATION,// 4 AudioColumns.DATA,// 5 AudioColumns.DATE_MODIFIED,// 6 AudioColumns.ALBUM_ID,// 7 AudioColumns.ALBUM,// 8 AudioColumns.ARTIST_ID,// 9 AudioColumns.ARTIST,// 10 MediaStore.Audio.Playlists.Members._ID // 11 }, SongLoader.BASE_SELECTION, null, MediaStore.Audio.Playlists.Members.DEFAULT_SORT_ORDER); } catch (SecurityException e) { return null; } }
public <R> SingleTransformer<? super R, ? extends R> composeSingle() { return new SingleTransformer<R, R>() { @Override public SingleSource<R> apply(@NonNull Single<R> upstream) { return upstream .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .retryWhen(new RetryWithDelay(maxRetry, todoBeforeRetry).forSingle) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { AbstractPresenter.this.addDisposable(disposable); } }); } }; }
@Override protected void subscribeActual(@NonNull SingleObserver<? super T> observer) { boolean b; try { b = condition.getAsBoolean(); } catch (Throwable ex) { EmptyDisposable.error(ex, observer); return; } if (b) { then.subscribe(observer); } else { orElse.subscribe(observer); } }
private void initRxBus() { subscribe = RxBus.getInstance() .toObserverable(UpdateEvent.class) .subscribe(new Consumer<UpdateEvent>() { @Override public void accept(@NonNull UpdateEvent o) throws Exception { System.out.println(o); showFragment = true; if (o.isUpdate()) { getUserEvent(); } if (o.isLogin()) { String email = ApiHelper.getInstance().getEmail(); tvEmail.setText(email + "@" + getString(R.string.title_email_suffix)); //headerView.setOnClickListener(null); } } }); }
public <T> ObservableTransformer<T, CacheResult<T>> customizeTransformer(@NonNull final Object key, final CustomizeTransformerCall customizeTransformerCall) { return new ObservableTransformer<T, CacheResult<T>>() { @Override public ObservableSource<CacheResult<T>> apply(@NonNull Observable<T> upstream) { return apply.applyCustomize(key, upstream, customizeTransformerCall); } }; }
@OnClick(R.id.edit) void edit() { new ScriptOperations(this, mView) .importSample(mSample) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String path) throws Exception { EditActivity.editFile(ViewSampleActivity.this, path); finish(); } }); }
private void setCurrentDirectory(final ScriptFile directory, boolean canGoBack) { if (!directory.equals(mCurrentDirectory) && mOnCurrentDirectoryChangeListener != null) { mOnCurrentDirectoryChangeListener.onChange(mCurrentDirectory, directory); } mCurrentDirectory = directory; mCanGoBack = canGoBack; if (mFileProcessListener != null) { mFileProcessListener.onFilesListing(); } Observable.fromPublisher(new Publisher<ScriptFile[]>() { @Override public void subscribe(Subscriber<? super ScriptFile[]> s) { s.onNext(mStorageScriptProvider.getDirectoryScriptFiles(directory)); s.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ScriptFile[]>() { @Override public void accept(@NonNull ScriptFile[] scriptFiles) throws Exception { mAdapter.setScripts(scriptFiles); if (mFileProcessListener != null) mFileProcessListener.onFileListed(); smoothScrollToPosition(0); } }); }
public Observable<Float> putFloat(final String key, final Float value) { return Observable.just(value) .doOnNext(new Consumer<Float>() { @Override public void accept(@NonNull Float v) throws Exception { sharedPreferences.edit().putFloat(key, v).apply(); } }); }
public void getPermissions(final String name, final String pass) { rxPermissions.request(Manifest.permission.READ_EXTERNAL_STORAGE,Manifest.permission.WRITE_EXTERNAL_STORAGE) .subscribe(new Consumer<Boolean>() { @Override public void accept(@NonNull Boolean aBoolean) throws Exception { if(aBoolean){ //Toast.makeText(LoginActivity.this, "权限获取成功", Toast.LENGTH_SHORT).show(); onLogin(name, pass); }else { showMissingPermissionDialog(); } } }); }
@Override protected void onStart() { super.onStart(); Observable.interval(1, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose(this.<Long>bindToLifecycle()) .subscribe(new Observer<Long>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Long aLong) { Log.i("接收数据", String.valueOf(aLong)); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } }); }
/** * 绑定 Fragment 的指定生命周期 * * @param view * @param event * @param <T> * @return */ public static <T> LifecycleTransformer<T> bindUntilEvent(@NonNull final IView view, final FragmentEvent event) { Preconditions.checkNotNull(view, "view == null"); if (view instanceof FragmentLifecycleable) { return bindUntilEvent((FragmentLifecycleable) view, event); } else { throw new IllegalArgumentException("view isn't FragmentLifecycleable"); } }
@Override public void requestData() { mCursor = ((LocalMusicActivity) mView).getContentResolver().query(MediaStore.Audio.Media.EXTERNAL_CONTENT_URI, null, null, null, MediaStore.Audio.Media.DEFAULT_SORT_ORDER); if (null != mCursor) { list.clear(); Observable.just(mCursor).flatMap(new Function<Cursor, ObservableSource<List<MusicBean>>>() { @Override public ObservableSource<List<MusicBean>> apply(@NonNull Cursor cursor) throws Exception { for (int i = 0; i < mCursor.getCount(); i++) { MusicBean bean = new MusicBean(); mCursor.moveToNext(); bean.setSongid((int) mCursor.getLong(mCursor.getColumnIndex(MediaStore.Audio.Media._ID)));//音乐id bean.setSongname(mCursor.getString((mCursor.getColumnIndex(MediaStore.Audio.Media.TITLE))));//歌曲名称 bean.setSingername(mCursor.getString(mCursor.getColumnIndex(MediaStore.Audio.Media.ARTIST)));//歌手 bean.setUrl(mCursor.getString(mCursor.getColumnIndex(MediaStore.Audio.Media.DATA)));//歌曲路径 bean.setType(Integer.parseInt(Constant.MUSIC_LOCAL)); MyApplication.getDaoSession().getMusicBeanDao().insertOrReplace(bean); list.add(bean); } return Observable.just(list); } }).subscribeOn(io.reactivex.schedulers.Schedulers.io()) .observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread()) .subscribe(list -> parseData(list)); } }
public static <T> ObservableTransformer<T, T> applyCommonSchedulers() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<T> upstream) { return upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
private static Predicate<Element> byClassElement() { return new Predicate<Element>() { @Override public boolean test(@NonNull Element element) throws Exception { return element.getKind().equals(ElementKind.CLASS); } }; }