private void searchData(String tag) { mHintText.setText(""); mTagLayout.setVisibility(View.GONE); mProgressBar.setVisibility(View.VISIBLE); mSearchTag = tag; Observable<GetDataBean> observable = mSearchApi.searchTagData(mSearchTag); observable .filter(new Predicate<GetDataBean>() { @Override public boolean test(@NonNull GetDataBean getDataBean) throws Exception { return getDataBean != null; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mObserver); }
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) { // this complexity exists because the access token must be revoked AFTER the refresh token // why? because the access token is needed for both revocations! Subject<JsonElement> responses = PublishSubject.create(); RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken( token.getRefreshToken(), clientSecret); revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses) .doOnComplete(() -> { RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken( token.getAccessToken(), clientSecret); revokeSingleToken(token.getAuthHeader(), accessReqBody, responses) .subscribe(); }) .subscribe(); return responses; }
public static void main(String[] args) { Observable<String> source1 = Observable.just("Alpha", "Beta"); Observable<String> source2 = Observable.just("Gamma", "Delta"); Observable<String> source3 = Observable.just("Epsilon", "Zeta"); Observable<String> source4 = Observable.just("Eta", "Theta"); Observable<String> source5 = Observable.just("Iota", "Kappa"); List<Observable<String>> sources = Arrays.asList(source1, source2, source3, source4, source5); Observable.merge(sources) .subscribe(i -> System.out.println("RECEIVED: " + i)); }
@Override protected void bindIntents() { intent(ProductDetailsView::addToShoppingCartIntent) .doOnNext(product -> Timber.d("intent: add to shopping cart %s", product)) .flatMap(product -> interactor.addToShoppingCart(product).toObservable()).subscribe(); intent(ProductDetailsView::removeFromShoppingCartIntent) .doOnNext(product -> Timber.d("intent: remove from shopping cart %s", product)) .flatMap(product -> interactor.removeFromShoppingCart(product).toObservable()) .subscribe(); Observable<ProductDetailsViewState> loadDetails = intent(ProductDetailsView::loadDetailsIntent) .doOnNext(productId -> Timber.d("intent: load details for product id = %s", productId)) .flatMap(interactor::getDetails) .observeOn(AndroidSchedulers.mainThread()); subscribeViewState(loadDetails, ProductDetailsView::render); }
@Test public void test_just_new() { Observable<Integer> observable = Observable.just(12, 34, 6); TestObserver<Integer> testObserver = new TestObserver<>(); observable.subscribe(testObserver); List<Integer> list = new ArrayList(); testObserver.assertComplete(); testObserver.assertResult(12, 34, 6); testObserver.assertValueCount(3); testObserver.assertNoErrors(); testObserver.assertValueAt(2, (value) -> { // TODO Auto-generated method stub return value == 34; }); }
@Test public void test_interval() { TestScheduler testScheduler=new TestScheduler(); Observable<Long>observable=Observable.interval(1, TimeUnit.SECONDS,testScheduler).take(5); TestObserver<Long> testObserver=new TestObserver<>(); observable.subscribeOn(testScheduler).subscribe(testObserver); testObserver.assertNoValues(); testObserver.assertNotComplete(); testObserver.assertNoErrors(); testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); testObserver.assertValueCount(1); testObserver.assertValues(0l); testScheduler.advanceTimeTo(6, TimeUnit.SECONDS); testObserver.assertValueCount(5); testObserver.assertValues(0l,1l,2l,3l,4l); }
/** * The returned Observable emits Query results as Lists. * Never completes, so you will get updates when underlying data changes. */ public static <T> Observable<List<T>> observable(final Query<T> query) { return Observable.create(new ObservableOnSubscribe<List<T>>() { @Override public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception { final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() { @Override public void onData(List<T> data) { if (!emitter.isDisposed()) { emitter.onNext(data); } } }); emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { dataSubscription.cancel(); } }); } }); }
public static Observable<Event> downloadRemoteArchive(final FileSystem fs, final RemoteArchive remoteArchive, final Path targetDirectory) { Preconditions.checkNotNull(fs); Preconditions.checkNotNull(remoteArchive); Preconditions.checkNotNull(targetDirectory); final Path zipFilePath = targetDirectory.getParent().resolve(targetDirectory.getFileName() + ".zip"); return Observable.concat( // Download the file CommonTasks.downloadRemoteFile(fs, remoteArchive.asRemoteFile(), zipFilePath), // Unpack the zip MoreCompletables.fromRunnable(() -> { EvenMoreFiles.unzip( zipFilePath, targetDirectory, remoteArchive.subPath, StandardCopyOption.REPLACE_EXISTING); }).toObservable()).subscribeOn(Schedulers.io()); }
public void enqueue(final ModuleCallback<T> callback) { synchronized (this) { if (mExecuted) { throw new IllegalStateException("每个ModuleCall只能enqueue一次"); } mExecuted = true; } if (mCanceled || mDone) { return; } mModuleCallback = callback; if (mObservable instanceof Observable) { subscribeObservable((Observable<T>) mObservable); } else if (mObservable instanceof Single) { subscribeSingle((Single<T>) mObservable); } else if (mObservable instanceof Flowable) { subscribeFlowable((Flowable<T>) mObservable); } else { subscribeMaybe((Maybe<T>) mObservable); } }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mLoginObservable = Observable .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .map(aLong -> "User id is " + UUID.randomUUID().toString()); mLoginFlowable = Flowable .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .map(aLong -> "User id is " + UUID.randomUUID().toString()); mCompositeDisposable = new CompositeDisposable(); findViewById(R.id.button_observable).setOnClickListener(this); findViewById(R.id.button_flowable).setOnClickListener(this); }
/** * 使用EasyHttp调用自定义api 注意:如果有签名的注意路径有"/"的情况如下 * https://www.xxx.com/v1/account/login (正确) * https://www.xxx.com//v1/account/login (错误 可能会导致签名失败) */ public void onCustomCall(View view) { final String name = "18688994275"; final String pass = "123456"; final CustomRequest request = EasyHttp.custom().addConverterFactory(GsonConverterFactory.create(new Gson())) .sign(true) .timeStamp(true) .params(ComParamContact.Login.ACCOUNT, name) .params(ComParamContact.Login.PASSWORD, MD5.encrypt4login(pass, AppConstant.APP_SECRET)) .build(); LoginService mLoginService = request.create(LoginService.class); Observable<ApiResult<AuthModel>> observable = request.call(mLoginService.login("v1/account/login", request.getParams().urlParamsMap)); Disposable disposable = observable.subscribe(new Consumer<ApiResult<AuthModel>>() { @Override public void accept(@NonNull ApiResult<AuthModel> result) throws Exception { showToast(result.toString()); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { showToast(throwable.getMessage()); } }); //EasyHttp.cancelSubscription(disposable);//取消订阅 }
public static Observable<List<Client>> getAllClients() { ClientsService service = ServiceGenerator.createService(ClientsService.class); return service.getAllClients(UrlManager.getAllClientsURL()) .flatMap(new Function<JsonElement, Observable<List<Client>>>() { @Override public Observable<List<Client>> apply(JsonElement jsonElement) throws Exception { if(jsonElement != null) { Log.i("Get All Clients" , "JSON: "+jsonElement.toString()); if(jsonElement.isJsonArray()) { List<Client> clients = Client.ClientsListParser.fromJsonArray(jsonElement.getAsJsonArray()); return Observable.just(clients); } else { return Observable.error(new Exception("Expected a JSON Array")); } } else { return Observable.just((List<Client>) new ArrayList<Client>()); } } }).observeOn(AndroidSchedulers.mainThread()); }
@Test public void viewsAreCalledBeforeLifecycleIsReadyWithoutLifecycleAwareRx() throws Exception { // Lifecycle is "active" once it is STARTED, it's not ready yet at INITIALIZED or CREATED. lifecycleOwner.handleLifecycleEvent(Lifecycle.Event.ON_CREATE); Observable.interval(1, TimeUnit.MILLISECONDS) .subscribeWith(new DisposableObserver<Long>() { @Override public void onNext(final Long value) { LifecycleTest.this.methodOnViewCalled = true; } @Override public void onError(final Throwable e) { } @Override public void onComplete() { } }); // Need to wait to give it time to potentially fail TimeUnit.MILLISECONDS.sleep(100); assertEquals(true, methodOnViewCalled); }
public Disposable getWhatsNewIcons() { return Observable.fromArray(mView.getResources().getStringArray(R.array.whatsnew)) .map(new Function<String, IconBean>() { @Override public IconBean apply(@NonNull String s) throws Exception { IconBean bean = new IconBean(); bean.id = mView.getResources().getIdentifier(s, "drawable", BuildConfig.APPLICATION_ID); bean.name = s; return bean; } }).toList().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<IconBean>>() { @Override public void accept(List<IconBean> list) throws Exception { mView.onLoadData(list); } }); }
@Test public void startWithTest() throws Exception { Observable.fromArray(nums) .startWith(-1) .test() .assertValueCount(7) .assertValueAt(0, -1); //也可以这样 Observable.fromArray(nums) .startWith(Observable.just(-1)) .test() .assertValueCount(7) .assertValueAt(0, -1); }
@Override public void onViewCreated(View view, Bundle savedInstanceState) { super.onViewCreated(view, savedInstanceState); mRepoListView = view.findViewById(R.id.repo_list_view); mObserverLog = view.findViewById(R.id.observer_log); mObserverLog.setMovementMethod(new ScrollingMovementMethod()); mApi.getObservableRepositories(Utils.USER) .flatMap(new Function<List<RepositoryResponse>, ObservableSource<RepositoryResponse>>() { @Override public ObservableSource<RepositoryResponse> apply(List<RepositoryResponse> repositoryResponses) throws Exception { return Observable.fromIterable(repositoryResponses); } }) .filter(new Predicate<RepositoryResponse>() { @Override public boolean test(RepositoryResponse repositoryResponse) throws Exception { //Filter the stream so only Java repositories are emitted. return repositoryResponse.language.equals("Java"); } }) //Subscribe the Network call in io Thread. .subscribeOn(Schedulers.io()) //Subscribe the Observer in MainThread so it can updates the UI with the result. .observeOn(AndroidSchedulers.mainThread()) //Choose the subscribed Observer for items emitted by this observable. .subscribe(mBaseObserver); }
private void doSomeWork() { final String[] aStrings = {"A1", "A2", "A3", "A4"}; final String[] bStrings = {"B1", "B2", "B3"}; final Observable<String> aObservable = Observable.fromArray(aStrings); final Observable<String> bObservable = Observable.fromArray(bStrings); Observable.combineLatest(aObservable, bObservable, new BiFunction<String, String, String>() { @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { return s + "-" + s2; } }).subscribe(getObserver()); }
/** 偏航规划结束 **/ @Override public void onYawingRequestSuccess() { Log.i(TAG, "onYawingRequestSuccess"); if (SynthesizerBase.isInited()) { SpeechMsgBuilder msgBuilder = new SpeechMsgBuilder("路线规划完毕") .setForceLocalEngine(true) .setOrigin(SpeechMsg.ORIGIN_COMMON); Observable<SpeechMsg> msgObservable = SynthesizerBase.get().addMessageWaitSpeak(msgBuilder.build()); if (msgObservable != null) { msgObservable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(); } } BNRoutePlanerProxy.getInstance().routePlans.clear(); RouteModel.getLastRouteModels().clear(); Vector<RoutePlanModelProxy> v = new Vector<RoutePlanModelProxy>(); ArrayList<RouteModel> routeModels = new ArrayList<RouteModel>(); int l = BNRoutePlanerProxy.getInstance().getRouteCnt(); if (l > 0) { for (int i = 0; i < l; i++) { Bundle bundle = new Bundle(); RoutePlanModelProxy rp = new RoutePlanModelProxy(); BNRoutePlanerProxy.getInstance().getRouteInfo(i, bundle); rp.parseRouteResult(mContext, bundle); routeModels.add(new RouteModel(bundle)); v.add(rp); } RouteModel.put(calculatePreference, routeModels); BNRoutePlanerProxy.getInstance().routePlans.put(calculatePreference, v); } }
void animation() { if (delay > 0 && frameNum > 0) Observable .interval(delay, TimeUnit.MILLISECONDS) .flatMap(new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long aLong) throws Exception { return Observable.just(aLong); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long drawable) { position++; if (position >= frameNum) position = 0; } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); }
private static void demo8() throws InterruptedException { Observable.range(1, 1000) .map(Objects::toString) .doOnNext(i -> log("doOnNext", i)) .observeOn(Schedulers.computation()) .subscribe(i -> log("subscribe", i)); WAIT_LATCH.await(); }
@Test public void usingFlatMapToJumpConditionallyOntoAnotherThread() throws Exception { Observable.just(1, 2, 3, 4).flatMap(x -> { if (x % 2 == 0) return Observable.just(x + 1).observeOn(Schedulers.io()); else return Observable.just(x + 3).observeOn(Schedulers.computation()); } ).subscribe(x -> { System.out.print(Thread.currentThread().getName()); System.out.println(": " + x); }); }
/** * Get the public (federated) timeline * * @return an array of Status containing the newest federated statuses */ public Observable<Response<Status[]>> getPublicTimeline() { return buildRxRetrofit().create(API.class).getPublicTimeline( Toot.buildBearer(), null ); }
public static Completable fadeInWithDelay(final int delay, final int duration, final View... views) { return Observable.range(0, views.length) .flatMapCompletable(i -> animate(views[i], new LinearInterpolator()) .duration(duration) .delay(i * delay) .fadeIn().schedule()); }
private Observable<Integer> filterThenMapNullableContainerLambdas2( Observable<NullableContainer<String>> observable) { return observable .filter( c -> { if (c.get() == null) { return false; } else { return true; } }) .map(c -> c.get().length()); }
@Override public Observable<PlayMusic> play(int position, boolean preLoad) { if (repository.findByListType(playlistType).getPlayMode() != this.playMode) { repository.findByListType(playlistType).setPlayMode(this.playMode); } return play(repository.findByListType(playlistType).getAndMark(position), preLoad); }
public static <T> ObservableTransformer<T, T> norTransformer(final int retryCount, final int retryDelayMillis) { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> apiResultObservable) { return apiResultObservable .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .retryWhen(new ApiRetryFunc(retryCount, retryDelayMillis)); } }; }
/** * 获取热门电影列表 * * @param limit * @return */ public Observable<HotMovieBean> getHotMovieList(int limit) { return RetrofitClient .getInstance() .apiServer() .getHotMovieList(20, limit) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
public static void main(String[] args) { Observable<String> source1 = Observable.interval(1, TimeUnit.SECONDS) .map(l -> (l + 1) + " seconds"); Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS) .map(l -> ((l + 1) * 300) + " milliseconds"); Subject<String> subject = PublishSubject.create(); subject.subscribe(System.out::println); source1.subscribe(subject); source2.subscribe(subject); sleep(3000); }
@Override public Observable<List<TestModel>> insertOrUpdate(List<TestModel> items) { List<TestModel> output = new ArrayList<>(items.size()); for(int i = 0; i < items.size(); i++) { output.set(i, insertObjectOrUpdate(items.get(i))); } return Observable.just(output); }
@FormUrlEncoded @POST("api/add2gank") Observable<Object> postDataByRx(@Field("url") String url, @Field("desc") String desc, @Field("who") String who, @Field("type") String type, @Field("debug") String debug);
@Test public void test1() throws Exception { Observable.fromArray(handleNull(GenericUtil.getGenericTypes(C1.class, I1.class))) .test() .assertValueCount(1) .assertValues(NULL); Observable.fromArray(handleNull(GenericUtil.getGenericTypes(C1.class, I2.class))) .test() .assertValueCount(2) .assertValues(getTV(C1.class, 0), O1.class); }
@Test public void sequential() { Observable<Integer> observable = createSequentialObservable(mockTracer); List<Integer> result = new ArrayList<>(); Consumer<Integer> onNext = consumer(result); observable.subscribe(new TracingConsumer<>(onNext, "sequential", mockTracer)); assertEquals(5, result.size()); List<MockSpan> spans = mockTracer.finishedSpans(); assertEquals(1, spans.size()); checkSpans(spans); assertNull(mockTracer.scopeManager().active()); }
public static void main(String[] args) { Observable<Integer> threeRandoms = Observable.range(1, 3) .map(i -> randomInt()) .publish() .autoConnect(2); //Observer 1 - print each random integer threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i)); //Observer 2 - sum the random integers, then print threeRandoms.reduce(0, (total, next) -> total + next) .subscribe(i -> System.out.println("Observer 2: " + i)); }
public <T> ObservableTransformer<T, T> applyObservableMainThread() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable.observeOn(AndroidSchedulers.mainThread()); } }; }
@Test public void testFirst() { Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Zeta"); String firstWithLengthFour = source.filter(s -> s.length() == 4) .blockingFirst(); assertTrue(firstWithLengthFour.equals("Beta")); }
@Test public void deleteSelectedCity() { CityDetailsModel cityDetailsModel = TestModels.newCityModel(); cityDetailsModel.setIsSelected(true); when(dataManager.deleteCity(cityDetailsModel)).thenReturn(Observable.just(true)); when(dataManager.selectFirstCity()).thenReturn(Observable.just(true)); presenter.deleteLocation(anyInt(), cityDetailsModel); testScheduler.triggerActions(); verify(view).onCityDelete(anyInt()); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); ButterKnife.bind(this); // demo(); // demo1(); // demo2(); // demo3(); // demo4(); demo5(); recyclerView.setHasFixedSize(true); layoutManager = new LinearLayoutManager(this); recyclerView.setLayoutManager(layoutManager); stockDataAdapter = new StockDataAdapter(); recyclerView.setAdapter(stockDataAdapter); Observable.just("Please use this app responsibly!") .subscribe(s -> helloText.setText(s)); Observable.just( new StockUpdate("GOOGLE", 12.43, new Date()), new StockUpdate("APPL", 645.1, new Date()), new StockUpdate("TWTR", 1.43, new Date()) ) .subscribe(stockUpdate -> { Log.d("APP", "New update " + stockUpdate.getStockSymbol()); stockDataAdapter.add(stockUpdate); }); }
public static void main(String[] args) { // TODO Auto-generated method stub Observable<Integer> observable = Observable.range(1, 50); observable.take(5).subscribe(new Observer<Integer>() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " finished reading of items"); } @Override public void onError(Throwable throwable) { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " finished with exception"); } @Override public void onNext(Integer value) { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " read item:-"+value); } @Override public void onSubscribe(Disposable arg0) { // TODO Auto-generated method stub } }); }
@Override public void setData(User data, int position) { Observable.just(data.getLogin()) .subscribe(s -> mName.setText(s)); mImageLoader.loadImage(mAppComponent.appManager().getCurrentActivity() == null ? mAppComponent.Application() : mAppComponent.appManager().getCurrentActivity(), GlideImageConfig .builder() .url(data.getAvatarUrl()) .imageView(mAvater) .build()); }