Java 类io.reactivex.Observable 实例源码

项目:MyEyepetizer    文件:SearchActivity.java   
private void searchData(String tag) {
    mHintText.setText("");
    mTagLayout.setVisibility(View.GONE);
    mProgressBar.setVisibility(View.VISIBLE);
    mSearchTag = tag;
    Observable<GetDataBean> observable = mSearchApi.searchTagData(mSearchTag);
    observable
            .filter(new Predicate<GetDataBean>() {
                @Override
                public boolean test(@NonNull GetDataBean getDataBean) throws Exception {
                    return getDataBean != null;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(mObserver);
}
项目:Ghost-Android    文件:AuthService.java   
private Observable<JsonElement> revokeToken(AuthToken token, String clientSecret) {
    // this complexity exists because the access token must be revoked AFTER the refresh token
    // why? because the access token is needed for both revocations!
    Subject<JsonElement> responses = PublishSubject.create();
    RevokeReqBody refreshReqBody = RevokeReqBody.fromRefreshToken(
            token.getRefreshToken(), clientSecret);
    revokeSingleToken(token.getAuthHeader(), refreshReqBody, responses)
            .doOnComplete(() -> {
                RevokeReqBody accessReqBody = RevokeReqBody.fromAccessToken(
                        token.getAccessToken(), clientSecret);
                revokeSingleToken(token.getAuthHeader(), accessReqBody, responses)
                        .subscribe();
            })
            .subscribe();
    return responses;
}
项目:Learning-RxJava    文件:Ch4_3.java   
public static void main(String[] args) {
    Observable<String> source1 =
            Observable.just("Alpha", "Beta");
    Observable<String> source2 =
            Observable.just("Gamma", "Delta");
    Observable<String> source3 =
            Observable.just("Epsilon", "Zeta");
    Observable<String> source4 =
            Observable.just("Eta", "Theta");
    Observable<String> source5 =
            Observable.just("Iota", "Kappa");
    List<Observable<String>> sources =
            Arrays.asList(source1, source2, source3, source4,
                    source5);
    Observable.merge(sources)
            .subscribe(i -> System.out.println("RECEIVED: " + i));
}
项目:GitHub    文件:ProductDetailsPresenter.java   
@Override protected void bindIntents() {

    intent(ProductDetailsView::addToShoppingCartIntent)
        .doOnNext(product -> Timber.d("intent: add to shopping cart %s", product))
        .flatMap(product -> interactor.addToShoppingCart(product).toObservable()).subscribe();

    intent(ProductDetailsView::removeFromShoppingCartIntent)
        .doOnNext(product -> Timber.d("intent: remove from shopping cart %s", product))
        .flatMap(product -> interactor.removeFromShoppingCart(product).toObservable())
        .subscribe();

    Observable<ProductDetailsViewState> loadDetails =
        intent(ProductDetailsView::loadDetailsIntent)
            .doOnNext(productId -> Timber.d("intent: load details for product id = %s", productId))
            .flatMap(interactor::getDetails)
            .observeOn(AndroidSchedulers.mainThread());

    subscribeViewState(loadDetails, ProductDetailsView::render);
  }
项目:Reactive-Programming-With-Java-9    文件:Modern_Testing.java   
@Test
public void test_just_new() {
    Observable<Integer> observable = Observable.just(12, 34, 6);
    TestObserver<Integer> testObserver = new TestObserver<>();

    observable.subscribe(testObserver);

    List<Integer> list = new ArrayList();
    testObserver.assertComplete();
    testObserver.assertResult(12, 34, 6);
    testObserver.assertValueCount(3);
    testObserver.assertNoErrors();
    testObserver.assertValueAt(2, (value) -> {
        // TODO Auto-generated method stub
        return value == 34;
    });

}
项目:Reactive-Programming-With-Java-9    文件:Modern_Testing.java   
@Test
public void test_interval()
{
    TestScheduler testScheduler=new TestScheduler();
    Observable<Long>observable=Observable.interval(1, TimeUnit.SECONDS,testScheduler).take(5);
    TestObserver<Long> testObserver=new TestObserver<>();


    observable.subscribeOn(testScheduler).subscribe(testObserver);

    testObserver.assertNoValues();
    testObserver.assertNotComplete();
    testObserver.assertNoErrors();

    testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    testObserver.assertValueCount(1);
    testObserver.assertValues(0l);

    testScheduler.advanceTimeTo(6, TimeUnit.SECONDS);
    testObserver.assertValueCount(5);
    testObserver.assertValues(0l,1l,2l,3l,4l);
}
项目:ObjectBoxRxJava    文件:RxQuery.java   
/**
 * The returned Observable emits Query results as Lists.
 * Never completes, so you will get updates when underlying data changes.
 */
public static <T> Observable<List<T>> observable(final Query<T> query) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
            final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
                @Override
                public void onData(List<T> data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
项目:buckaroo    文件:CommonTasks.java   
public static Observable<Event> downloadRemoteArchive(final FileSystem fs, final RemoteArchive remoteArchive, final Path targetDirectory) {

        Preconditions.checkNotNull(fs);
        Preconditions.checkNotNull(remoteArchive);
        Preconditions.checkNotNull(targetDirectory);

        final Path zipFilePath = targetDirectory.getParent().resolve(targetDirectory.getFileName() + ".zip");

        return Observable.concat(

            // Download the file
            CommonTasks.downloadRemoteFile(fs, remoteArchive.asRemoteFile(), zipFilePath),

            // Unpack the zip
            MoreCompletables.fromRunnable(() -> {
                EvenMoreFiles.unzip(
                    zipFilePath,
                    targetDirectory,
                    remoteArchive.subPath,
                    StandardCopyOption.REPLACE_EXISTING);
            }).toObservable()).subscribeOn(Schedulers.io());
    }
项目:android-arch-mvvm    文件:ModuleCall.java   
public void enqueue(final ModuleCallback<T> callback) {
    synchronized (this) {
        if (mExecuted) {
            throw new IllegalStateException("每个ModuleCall只能enqueue一次");
        }
        mExecuted = true;
    }
    if (mCanceled || mDone) {
        return;
    }
    mModuleCallback = callback;

    if (mObservable instanceof Observable) {
        subscribeObservable((Observable<T>) mObservable);
    } else if (mObservable instanceof Single) {
        subscribeSingle((Single<T>) mObservable);
    } else if (mObservable instanceof Flowable) {
        subscribeFlowable((Flowable<T>) mObservable);
    } else {
        subscribeMaybe((Maybe<T>) mObservable);
    }
}
项目:rx-progress-dialog    文件:MainActivity.java   
@Override
protected void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  setContentView(R.layout.activity_main);

  mLoginObservable = Observable
      .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
      .map(aLong -> "User id is " + UUID.randomUUID().toString());
  mLoginFlowable = Flowable
      .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
      .map(aLong -> "User id is " + UUID.randomUUID().toString());

  mCompositeDisposable = new CompositeDisposable();

  findViewById(R.id.button_observable).setOnClickListener(this);
  findViewById(R.id.button_flowable).setOnClickListener(this);
}
项目:RxEasyHttp    文件:MainActivity.java   
/**
 * 使用EasyHttp调用自定义api  注意:如果有签名的注意路径有"/"的情况如下
 * https://www.xxx.com/v1/account/login (正确)
 * https://www.xxx.com//v1/account/login (错误 可能会导致签名失败)
 */
public void onCustomCall(View view) {
    final String name = "18688994275";
    final String pass = "123456";
    final CustomRequest request = EasyHttp.custom().addConverterFactory(GsonConverterFactory.create(new Gson()))
            .sign(true)
            .timeStamp(true)
            .params(ComParamContact.Login.ACCOUNT, name)
            .params(ComParamContact.Login.PASSWORD, MD5.encrypt4login(pass, AppConstant.APP_SECRET))
            .build();

    LoginService mLoginService = request.create(LoginService.class);
    Observable<ApiResult<AuthModel>> observable = request.call(mLoginService.login("v1/account/login", request.getParams().urlParamsMap));
    Disposable disposable = observable.subscribe(new Consumer<ApiResult<AuthModel>>() {
        @Override
        public void accept(@NonNull ApiResult<AuthModel> result) throws Exception {
            showToast(result.toString());
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            showToast(throwable.getMessage());
        }
    });
    //EasyHttp.cancelSubscription(disposable);//取消订阅
}
项目:GSB-2017-Android    文件:ClientsNetworkCalls.java   
public static Observable<List<Client>> getAllClients() {
    ClientsService service = ServiceGenerator.createService(ClientsService.class);
    return service.getAllClients(UrlManager.getAllClientsURL())
            .flatMap(new Function<JsonElement, Observable<List<Client>>>() {
                @Override
                public Observable<List<Client>> apply(JsonElement jsonElement) throws Exception {
                    if(jsonElement != null) {
                        Log.i("Get All Clients" , "JSON: "+jsonElement.toString());
                        if(jsonElement.isJsonArray()) {
                            List<Client> clients = Client.ClientsListParser.fromJsonArray(jsonElement.getAsJsonArray());
                            return Observable.just(clients);
                        } else {
                            return Observable.error(new Exception("Expected a JSON Array"));
                        }
                    } else {
                        return Observable.just((List<Client>) new ArrayList<Client>());
                    }
                }
            }).observeOn(AndroidSchedulers.mainThread());
}
项目:LifecycleAwareRx    文件:LifecycleTest.java   
@Test
public void viewsAreCalledBeforeLifecycleIsReadyWithoutLifecycleAwareRx() throws Exception {
    // Lifecycle is "active" once it is STARTED, it's not ready yet at INITIALIZED or CREATED.
    lifecycleOwner.handleLifecycleEvent(Lifecycle.Event.ON_CREATE);

    Observable.interval(1, TimeUnit.MILLISECONDS)
        .subscribeWith(new DisposableObserver<Long>() {
            @Override
            public void onNext(final Long value) {
                LifecycleTest.this.methodOnViewCalled = true;
            }

            @Override
            public void onError(final Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

    // Need to wait to give it time to potentially fail
    TimeUnit.MILLISECONDS.sleep(100);
    assertEquals(true, methodOnViewCalled);
}
项目:MBEStyle    文件:IconShowPresenter.java   
public Disposable getWhatsNewIcons() {
    return Observable.fromArray(mView.getResources().getStringArray(R.array.whatsnew))
            .map(new Function<String, IconBean>() {
                @Override
                public IconBean apply(@NonNull String s) throws Exception {
                    IconBean bean = new IconBean();
                    bean.id = mView.getResources().getIdentifier(s, "drawable", BuildConfig.APPLICATION_ID);
                    bean.name = s;

                    return bean;
                }
            }).toList().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<IconBean>>() {
                @Override
                public void accept(List<IconBean> list) throws Exception {
                    mView.onLoadData(list);
                }
            });
}
项目:RxFamilyUsage-Android    文件:RxJava2CombineTest.java   
@Test
public void startWithTest() throws Exception {
    Observable.fromArray(nums)
            .startWith(-1)
            .test()
            .assertValueCount(7)
            .assertValueAt(0, -1);
    //也可以这样
    Observable.fromArray(nums)
            .startWith(Observable.just(-1))
            .test()
            .assertValueCount(7)
            .assertValueAt(0, -1);
}
项目:Rx-Android-Samples    文件:FilterOperatorFragment.java   
@Override
public void onViewCreated(View view, Bundle savedInstanceState) {
    super.onViewCreated(view, savedInstanceState);

    mRepoListView = view.findViewById(R.id.repo_list_view);
    mObserverLog = view.findViewById(R.id.observer_log);
    mObserverLog.setMovementMethod(new ScrollingMovementMethod());

    mApi.getObservableRepositories(Utils.USER)
            .flatMap(new Function<List<RepositoryResponse>, ObservableSource<RepositoryResponse>>() {
                @Override
                public ObservableSource<RepositoryResponse> apply(List<RepositoryResponse> repositoryResponses) throws Exception {

                    return Observable.fromIterable(repositoryResponses);
                }
            })
            .filter(new Predicate<RepositoryResponse>() {
                @Override
                public boolean test(RepositoryResponse repositoryResponse) throws Exception {
                    //Filter the stream so only Java repositories are emitted.
                    return repositoryResponse.language.equals("Java");
                }
            })
            //Subscribe the Network call in io Thread.
            .subscribeOn(Schedulers.io())
            //Subscribe the Observer in MainThread so it can updates the UI with the result.
            .observeOn(AndroidSchedulers.mainThread())
            //Choose the subscribed Observer for items emitted by this observable.
            .subscribe(mBaseObserver);
}
项目:RxJava2-Android-Sample    文件:CombineLatestExampleActivity.java   
private void doSomeWork() {
    final String[] aStrings = {"A1", "A2", "A3", "A4"};
    final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.combineLatest(aObservable, bObservable, new BiFunction<String, String, String>() {
        @Override
        public String apply(@NonNull String s, @NonNull String s2) throws Exception {
            return s + "-" + s2;
        }
    }).subscribe(getObserver());
}
项目:AssistantBySDK    文件:NaviSetLinePresenter.java   
/** 偏航规划结束 **/
@Override
public void onYawingRequestSuccess() {
    Log.i(TAG, "onYawingRequestSuccess");
    if (SynthesizerBase.isInited()) {
        SpeechMsgBuilder msgBuilder = new SpeechMsgBuilder("路线规划完毕")
                .setForceLocalEngine(true)
                .setOrigin(SpeechMsg.ORIGIN_COMMON);
        Observable<SpeechMsg> msgObservable = SynthesizerBase.get().addMessageWaitSpeak(msgBuilder.build());
        if (msgObservable != null) {
            msgObservable.subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.computation())
                    .subscribe();
        }
    }
    BNRoutePlanerProxy.getInstance().routePlans.clear();
    RouteModel.getLastRouteModels().clear();
    Vector<RoutePlanModelProxy> v = new Vector<RoutePlanModelProxy>();
    ArrayList<RouteModel> routeModels = new ArrayList<RouteModel>();
    int l = BNRoutePlanerProxy.getInstance().getRouteCnt();
    if (l > 0) {
        for (int i = 0; i < l; i++) {
            Bundle bundle = new Bundle();
            RoutePlanModelProxy rp = new RoutePlanModelProxy();
            BNRoutePlanerProxy.getInstance().getRouteInfo(i, bundle);
            rp.parseRouteResult(mContext, bundle);
            routeModels.add(new RouteModel(bundle));
            v.add(rp);
        }
        RouteModel.put(calculatePreference, routeModels);
        BNRoutePlanerProxy.getInstance().routePlans.put(calculatePreference, v);
    }
}
项目:GifEmoji    文件:EmoticonDrawable.java   
void animation() {
    if (delay > 0 && frameNum > 0)
        Observable
                .interval(delay, TimeUnit.MILLISECONDS)
                .flatMap(new Function<Long, ObservableSource<Long>>() {
                    @Override
                    public ObservableSource<Long> apply(Long aLong) throws Exception {
                        return Observable.just(aLong);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Long drawable) {
                        position++;
                        if (position >= frameNum) position = 0;
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
}
项目:Reactive-Android-Programming    文件:Sandbox.java   
private static void demo8() throws InterruptedException {
    Observable.range(1, 1000)
            .map(Objects::toString)
            .doOnNext(i -> log("doOnNext", i))
            .observeOn(Schedulers.computation())
            .subscribe(i -> log("subscribe", i));

    WAIT_LATCH.await();
}
项目:rxjavatraining    文件:OpenQuestionsTest.java   
@Test
public void usingFlatMapToJumpConditionallyOntoAnotherThread() throws Exception {
    Observable.just(1, 2, 3, 4).flatMap(x -> {
                if (x % 2 == 0) return Observable.just(x + 1).observeOn(Schedulers.io());
                else return Observable.just(x + 3).observeOn(Schedulers.computation());
            }
    ).subscribe(x -> {
        System.out.print(Thread.currentThread().getName());
        System.out.println(": " + x);
    });
}
项目:TootApp    文件:Mastodon.java   
/**
 * Get the public (federated) timeline
 *
 * @return an array of Status containing the newest federated statuses
 */
public Observable<Response<Status[]>> getPublicTimeline() {
    return buildRxRetrofit().create(API.class).getPublicTimeline(
            Toot.buildBearer(),
            null
    );
}
项目:Rx2Animations    文件:RxAnimations.java   
public static Completable fadeInWithDelay(final int delay, final int duration, final View... views) {
    return Observable.range(0, views.length)
                     .flatMapCompletable(i -> animate(views[i], new LinearInterpolator())
                             .duration(duration)
                             .delay(i * delay)
                             .fadeIn().schedule());
}
项目:NullAway    文件:NullAwayRxSupportNegativeCases.java   
private Observable<Integer> filterThenMapNullableContainerLambdas2(
    Observable<NullableContainer<String>> observable) {
  return observable
      .filter(
          c -> {
            if (c.get() == null) {
              return false;
            } else {
              return true;
            }
          })
      .map(c -> c.get().length());
}
项目:AssistantBySDK    文件:LingjuAudioPlayer.java   
@Override
public Observable<PlayMusic> play(int position, boolean preLoad) {
    if (repository.findByListType(playlistType).getPlayMode() != this.playMode) {
        repository.findByListType(playlistType).setPlayMode(this.playMode);
    }
    return play(repository.findByListType(playlistType).getAndMark(position), preLoad);
}
项目:XSnow    文件:ApiTransformer.java   
public static <T> ObservableTransformer<T, T> norTransformer(final int retryCount, final int retryDelayMillis) {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> apiResultObservable) {
            return apiResultObservable
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .retryWhen(new ApiRetryFunc(retryCount, retryDelayMillis));
        }
    };
}
项目:ZhaZhaShop    文件:HotMovieListManager.java   
/**
 * 获取热门电影列表
 *
 * @param limit
 * @return
 */
public Observable<HotMovieBean> getHotMovieList(int limit) {
    return RetrofitClient
            .getInstance()
            .apiServer()
            .getHotMovieList(20, limit)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
项目:Learning-RxJava    文件:Ch5_21.java   
public static void main(String[] args) {
    Observable<String> source1 =
            Observable.interval(1, TimeUnit.SECONDS)
                    .map(l -> (l + 1) + " seconds");
    Observable<String> source2 =
            Observable.interval(300, TimeUnit.MILLISECONDS)
                    .map(l -> ((l + 1) * 300) + " milliseconds");
    Subject<String> subject = PublishSubject.create();
    subject.subscribe(System.out::println);
    source1.subscribe(subject);
    source2.subscribe(subject);
    sleep(3000);
}
项目:store2realm    文件:MemoryDao.java   
@Override
public Observable<List<TestModel>> insertOrUpdate(List<TestModel> items) {
    List<TestModel> output = new ArrayList<>(items.size());

    for(int i = 0; i < items.size(); i++) {
        output.set(i, insertObjectOrUpdate(items.get(i)));
    }
    return Observable.just(output);
}
项目:csdn-retrofit    文件:GankApi.java   
@FormUrlEncoded
@POST("api/add2gank")
Observable<Object> postDataByRx(@Field("url") String url,
                                @Field("desc") String desc,
                                @Field("who") String who,
                                @Field("type") String type,
                                @Field("debug") String debug);
项目:Java-EX    文件:GenericUtilTest.java   
@Test
public void test1() throws Exception {
  Observable.fromArray(handleNull(GenericUtil.getGenericTypes(C1.class, I1.class)))
      .test()
      .assertValueCount(1)
      .assertValues(NULL);
  Observable.fromArray(handleNull(GenericUtil.getGenericTypes(C1.class, I2.class)))
      .test()
      .assertValueCount(2)
      .assertValues(getTV(C1.class, 0), O1.class);
}
项目:java-rxjava    文件:TracingConsumerTest.java   
@Test
public void sequential() {
  Observable<Integer> observable = createSequentialObservable(mockTracer);
  List<Integer> result = new ArrayList<>();
  Consumer<Integer> onNext = consumer(result);

  observable.subscribe(new TracingConsumer<>(onNext, "sequential", mockTracer));
  assertEquals(5, result.size());

  List<MockSpan> spans = mockTracer.finishedSpans();
  assertEquals(1, spans.size());
  checkSpans(spans);

  assertNull(mockTracer.scopeManager().active());
}
项目:Learning-RxJava    文件:Ch5_7.java   
public static void main(String[] args) {
        Observable<Integer> threeRandoms = Observable.range(1, 3)
                .map(i -> randomInt())
                .publish()
                .autoConnect(2);

//Observer 1 - print each random integer
        threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));

//Observer 2 - sum the random integers, then print
                threeRandoms.reduce(0, (total, next) -> total + next)
                        .subscribe(i -> System.out.println("Observer 2: " + i));
    }
项目:GifEmoticon    文件:EmoticonDrawable.java   
void animation() {
    if (delay > 0 && frameNum > 0)
        Observable
                .interval(delay, TimeUnit.MILLISECONDS)
                .flatMap(new Function<Long, ObservableSource<Long>>() {
                    @Override
                    public ObservableSource<Long> apply(Long aLong) throws Exception {
                        return Observable.just(aLong);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Long drawable) {
                        position++;
                        if (position >= frameNum) position = 0;
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
}
项目:RxJava2-Android-Sample    文件:RxSchedulers.java   
public <T> ObservableTransformer<T, T> applyObservableMainThread() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
项目:Learning-RxJava    文件:Ch10_4.java   
@Test
public void testFirst() {
    Observable<String> source =
            Observable.just("Alpha", "Beta", "Gamma", "Delta",
                    "Zeta");
    String firstWithLengthFour = source.filter(s -> s.length()
            == 4)
            .blockingFirst();
    assertTrue(firstWithLengthFour.equals("Beta"));
}
项目:Weather-Guru-MVP    文件:ManageCityPresenterTest.java   
@Test public void deleteSelectedCity() {
  CityDetailsModel cityDetailsModel = TestModels.newCityModel();
  cityDetailsModel.setIsSelected(true);

  when(dataManager.deleteCity(cityDetailsModel)).thenReturn(Observable.just(true));
  when(dataManager.selectFirstCity()).thenReturn(Observable.just(true));

  presenter.deleteLocation(anyInt(), cityDetailsModel);
  testScheduler.triggerActions();

  verify(view).onCityDelete(anyInt());
}
项目:Reactive-Android-Programming    文件:MainActivity.java   
@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        ButterKnife.bind(this);

//        demo();
//        demo1();
//        demo2();
//        demo3();
//        demo4();
        demo5();
        recyclerView.setHasFixedSize(true);

        layoutManager = new LinearLayoutManager(this);
        recyclerView.setLayoutManager(layoutManager);

        stockDataAdapter = new StockDataAdapter();
        recyclerView.setAdapter(stockDataAdapter);

        Observable.just("Please use this app responsibly!")
                .subscribe(s -> helloText.setText(s));

        Observable.just(
                new StockUpdate("GOOGLE", 12.43, new Date()),
                new StockUpdate("APPL", 645.1, new Date()),
                new StockUpdate("TWTR", 1.43, new Date())
        )
                .subscribe(stockUpdate -> {
                    Log.d("APP", "New update " + stockUpdate.getStockSymbol());
                    stockDataAdapter.add(stockUpdate);
                });
    }
项目:Reactive-Programming-With-Java-9    文件:Demo_take_no_threading.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Observable<Integer> observable = Observable.range(1, 50);

    observable.take(5).subscribe(new Observer<Integer>() {

        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println(Thread.currentThread().getName() + " finished reading of items");

        }

        @Override
        public void onError(Throwable throwable) {
            // TODO Auto-generated method stub
            System.out.println(Thread.currentThread().getName() + " finished with exception");

        }

        @Override
        public void onNext(Integer value) {
            // TODO Auto-generated method stub
            System.out.println(Thread.currentThread().getName() + " read item:-"+value);

        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub

        }
    });

}
项目:MVPArmsTest1    文件:UserItemHolder.java   
@Override
public void setData(User data, int position) {
    Observable.just(data.getLogin())
            .subscribe(s -> mName.setText(s));

    mImageLoader.loadImage(mAppComponent.appManager().getCurrentActivity() == null
                    ? mAppComponent.Application() : mAppComponent.appManager().getCurrentActivity(),
            GlideImageConfig
                    .builder()
                    .url(data.getAvatarUrl())
                    .imageView(mAvater)
                    .build());
}