@Test public void shouldPresent_AnotherError_IntoView() throws Exception { Flowable<FactAboutNumber> broken = Flowable.error(new IllegalAccessError("WTF!")); when(usecase.fetchTrivia()).thenReturn(broken); presenter.fetchRandomFacts(); BehavioursRobot.with(view) .showLoadingFirstHideLoadingAfter() .disableRefreshFirstAndEnableAfter() .shouldShowErrorState() .shouldNotShowEmptyState() .shouldNotReportNetworkingError(); DataFlowWatcher.with(onNext, onError, onCompleted).shouldFinishWithError(); verify(strategist, oneTimeOnly()).applyStrategy(any()); }
/** * 异常处理变换 * * @return */ public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.flatMap(new Function<T, Publisher<T>>() { @Override public Publisher<T> apply(T model) throws Exception { if (model == null || model.isNull()) { return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError)); } else if (model.isAuthError()) { return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError)); } else if (model.isBizError()) { return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError)); } else { return Flowable.just(model); } } }); } }; }
@Transformation public void computeStatistics() { reviews .transformPayloadFlow(flow -> flow .groupBy(data -> data.getString("course")) .flatMap(group -> group .map(i -> i.getInteger("rating")) .buffer(1, TimeUnit.MINUTES) .map(Flowable::fromIterable) .flatMap(MathFlowable::averageDouble) .map(avg -> Pair.pair(group.getKey(), avg) )) ) .to(Sink.forEachPayload(pair -> System.out.println("Window rating of " + pair.left() + " : " + pair.right()))); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mLoginObservable = Observable .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .map(aLong -> "User id is " + UUID.randomUUID().toString()); mLoginFlowable = Flowable .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .map(aLong -> "User id is " + UUID.randomUUID().toString()); mCompositeDisposable = new CompositeDisposable(); findViewById(R.id.button_observable).setOnClickListener(this); findViewById(R.id.button_flowable).setOnClickListener(this); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_compose_operator_example); /* Compose for reusable code. */ Observable.just(1, 2, 3, 4, 5) .compose(schedulers.<Integer>applyObservableAsync()) .subscribe(/* */); Flowable.just(1, 2, 3, 4, 5) .compose(schedulers.<Integer>applyFlowableAsysnc()) .subscribe(/* */); }
@Test public void bodyRespectsBackpressure() { server.enqueue(new MockResponse().setBody("Hi")); RecordingSubscriber<String> subscriber = subscriberRule.createWithInitialRequest(0); Flowable<String> o = service.body(); o.subscribe(subscriber); assertThat(server.getRequestCount()).isEqualTo(1); subscriber.assertNoEvents(); subscriber.request(1); subscriber.assertAnyValue().assertComplete(); subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications. assertThat(server.getRequestCount()).isEqualTo(1); }
@Test public void zip() throws Exception{ Consumer<Object> consumer = v -> System.out.println("[" + System.currentTimeMillis() / 100 + "] " + v); Flowable<Long> f1 = Flowable.interval(100, TimeUnit.MILLISECONDS); Flowable<Long> f2 = Flowable.interval(200, TimeUnit.MILLISECONDS); Flowable<Long> f3 = Flowable.zip(f1, f2, (x, y) -> x * 10000 + y); f3.subscribe(consumer); }
@POST("user/regist/person") @Multipart Flowable<ResponseDto<User>> regPerson( @PartMap Map<String, RequestBody> param, @Part MultipartBody.Part front, @Part MultipartBody.Part back );
@Override public <E> Flowable<RealmList<E>> from(Realm realm, final RealmList<E> list) { final RealmConfiguration realmConfig = realm.getConfiguration(); return Flowable.create(new FlowableOnSubscribe<RealmList<E>>() { @Override public void subscribe(final FlowableEmitter<RealmList<E>> emitter) throws Exception { // Gets instance to make sure that the Realm is open for as long as the // Observable is subscribed to it. final Realm observableRealm = Realm.getInstance(realmConfig); listRefs.get().acquireReference(list); final RealmChangeListener<RealmList<E>> listener = new RealmChangeListener<RealmList<E>>() { @Override public void onChange(RealmList<E> results) { if (!emitter.isCancelled()) { emitter.onNext(list); } } }; list.addChangeListener(listener); // Cleanup when stream is disposed emitter.setDisposable(Disposables.fromRunnable(new Runnable() { @Override public void run() { list.removeChangeListener(listener); observableRealm.close(); listRefs.get().releaseReference(list); } })); // Emit current value immediately emitter.onNext(list); } }, BACK_PRESSURE_STRATEGY); }
/** * 取消监听 * @param tag * @param flowable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Flowable<?> flowable) { if (null == flowable) return getInstance(); List<FlowableProcessor> processors = mProcessorMapper.get(tag); if (null != processors) { processors.remove((FlowableProcessor<?>) flowable); if (isEmpty(processors)) { mProcessorMapper.remove(tag); } } return getInstance(); }
private void definedFlowable() { Flowable.interval(1, TimeUnit.MICROSECONDS) .onBackpressureDrop() //加上背压策略 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Long>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); } @Override public void onNext(Long aLong) { Log.d(TAG, "onNext: " + aLong); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
@Test public void testFlowable() throws Exception { Iterator<Integer> iterator = Flowable.range(0, 10) .to(RxIterator.flowableIterator()); int i = 0; while (iterator.hasNext()) { assertEquals(i++, iterator.next().intValue()); } }
@Test public void oneToMany() { RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel); Flowable<HelloResponse> resp = stub.sayHelloRespStream(Single.just(HelloRequest.getDefaultInstance())); TestSubscriber<HelloResponse> test = resp .doOnNext(msg -> System.out.println(msg)) .doOnError(throwable -> System.out.println(throwable.getMessage())) .doOnComplete(() -> System.out.println("Completed")) .doOnCancel(() -> System.out.println("Client canceled")) .test(); test.awaitTerminalEvent(3, TimeUnit.SECONDS); test.assertError(t -> t instanceof StatusRuntimeException); test.assertError(t -> ((StatusRuntimeException)t).getStatus() == Status.INTERNAL); }
@Override public Flowable<Optional<TestModel>> insert(TestModel item) { if(shouldThrowError){ return Flowable.error(new Exception("insertSingle.error")); } return Flowable.just(Optional.wrap(item)).delay(1, TimeUnit.SECONDS); }
public <T> void on(String eventName, Consumer<T> consumer) { Flowable<T> flowable = mRxBus.register(eventName); mProcessorMap.put(eventName, flowable); mDisposable.add(flowable.observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer, throwable -> { throwable.printStackTrace(); })); }
@Test public void testComplexShaping() { Function<Flowable<Quote>, Flowable<String>> toAuthor = flow -> flow.map(q -> q.author); Function<Flowable<Quote>, Flowable<String>> toWords = flow -> flow .concatMap(q -> Flowable.fromArray(q.quote.split(" "))); CacheSink<String> authors = new CacheSink<>(); CacheSink<String> words = new CacheSink<>(); List<Quote> quotes = new ArrayList<>(); quotes.add(new Quote("Attitude is everything", "Diane Von Furstenberg")); quotes.add(new Quote("Life is short, heels shouldn't be", "Brian Atwood")); quotes.add(new Quote("Red is the color for fall", "Piera Gelardi")); quotes.add(new Quote("Rhinestones make everything better", "Piera Gelardi")); quotes.add(new Quote("Design is so simple, that's why it's so complicated", "Paul Rand")); List<DataStream<Quote>> broadcast = Source.from(quotes.stream().map(Data::new)).broadcast(2); broadcast.get(0) .transformPayloadFlow(toAuthor) .transformPayloadFlow(Flowable::distinct) .to(authors); broadcast.get(1) .transformPayloadFlow(toWords) .transformPayloadFlow(Flowable::distinct) .to(words); await().until(() -> authors.cache().size() == 4); assertThat(authors.cache()).hasSize(4); assertThat(words.cache()).isNotEmpty(); }
/** * Get请求的Rxjava方式. * @param url * @param requestParams * @param cacheType * @return */ public <T> Flowable<T> get(String url, EasyRequestParams requestParams, int cacheType, RxEasyConverter<T> rxEasyConverter) { final Request request = new Request.Builder().url(EasyHttpClientManager.getInstance().buildUrl(url, requestParams)).build(); // 接口没有单独设定缓存类型,使用全局缓存类型. if (cacheType == EasyCacheType.CACHE_TYPE_NO_SETTING) { cacheType = EasyHttpClientManager.getInstance().getConfig().getGlobalCacheType(); } Call call = EasyHttpClientManager.getInstance().getOkHttpClient(cacheType).newCall(request); return Flowable.create(new CallFlowableOnSubscribe(call, rxEasyConverter), BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()); }
private void loadStatistics() { mStatisticsView.setProgressIndicator(true); // The network request might be handled in a different thread so make sure Espresso knows // that the app is busy until the response is handled. EspressoIdlingResource.increment(); // App is busy until further notice Flowable<Task> tasks = mTasksRepository .getTasks() .flatMap(Flowable::fromIterable); Flowable<Long> completedTasks = tasks.filter(Task::isCompleted).count().toFlowable(); Flowable<Long> activeTasks = tasks.filter(Task::isActive).count().toFlowable(); Disposable disposable = Flowable .zip(completedTasks, activeTasks, (completed, active) -> Pair.create(active, completed)) .subscribeOn(mSchedulerProvider.computation()) .observeOn(mSchedulerProvider.ui()) .doFinally(() -> { if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) { EspressoIdlingResource.decrement(); // Set app as idle. } }) .subscribe( // onNext stats -> mStatisticsView.showStatistics(Ints.saturatedCast(stats.first), Ints.saturatedCast(stats.second)), // onError throwable -> mStatisticsView.showLoadingStatisticsError(), // onCompleted () -> mStatisticsView.setProgressIndicator(false)); mCompositeDisposable.add(disposable); }
@SuppressWarnings("unchecked") @Override public <T> ErrorStreamPair<T> create(StreamId<T> id, DiscoveryService discoveryService) { if (!(id instanceof IntervalStreamId)) { return ErrorStreamPair.empty(); } IntervalStreamId typedId = (IntervalStreamId) id; ErrorDeflector ed = ErrorDeflector.create(); Publisher<Long> dataPublisher = Flowable.interval(typedId.getPeriod(), typedId.getPeriodTimeUnit()) .delay(typedId.getInitialDelay(), typedId.getInitialDelayTimeUnit()); return ed.stream((Publisher<T>) dataPublisher); }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar); setSupportActionBar(toolbar); toolbar.setSubtitle("By soussidev"); FloatingActionButton fab = (FloatingActionButton) findViewById(R.id.fab); fab.setOnClickListener(view -> { //Set Default Text to Button btn_loading_observe.setText("Show Loading Observe"); btn_loadig_flowable.setText("Show Loading Flowable"); }); InitView(); //Init Observe for btn observe user_Observable = Observable .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .doOnTerminate(() -> btn_loading_observe.setText("Observing Again")) .doOnComplete(() -> AnimationView()) //if complete show animation .map(aLong -> getMessageResult()); //Call function messageresult() //Init Flowable for btn Floable user_Flowable = Flowable .timer(TIME_DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .doOnComplete(() -> btn_loadig_flowable.setText("Flowable Again")) .doOnComplete(() -> AnimationView()) //if complete show animation .map(aLong -> getMessageResult()); //Call function messageresult() }
public static <T> FlowableTransformer<T, T> getFlowableScheduler(final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryWhenHandler) { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream .retryWhen(retryWhenHandler) .onErrorResumeNext(new ServerResultErrorFunc2<T>()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
private void flowable() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { Log.e(TAG, "start send data "); for (int i = 0; i < 100; i++) { e.onNext(i); } e.onComplete(); } }, BackpressureStrategy.DROP)//指定背压策略 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber<Integer>() { @Override public void onSubscribe(@NonNull Subscription s) { //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法 //2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求 //3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。 Log.e(TAG, "onSubscribe..."); s.request(10); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext:" + integer); } @Override public void onError(Throwable t) { Log.e(TAG, "onError..." + t); } @Override public void onComplete() { Log.e(TAG, "onComplete..."); } }); }
@Test public void testStartWith() throws Exception { Flowable.range(0, 10) .sorted(startWith(3, 5, 7)) .take(5) .test() .assertResult(3, 5, 7, 0, 1); }
@Override public Flowable<List<Article>> getAllArticlesByCategory() { ArticleDao dao = mDatabase.articleDao(); return dao.getAllArticlesByCategory()//query the local db for article list .map(articles -> { if(articles.size() > 0) return articles; //if the list size is > 0, return it else return fetchFromNetwork(dao); //if list is empty fetch from network }); }
/** * Simulation of network data */ private Flowable<List<String>> dataFromNetwork(final int page) { return Flowable.just(true) .delay(2, TimeUnit.SECONDS) .map(new Function<Boolean, List<String>>() { @Override public List<String> apply(@NonNull Boolean value) throws Exception { List<String> items = new ArrayList<>(); for (int i = 1; i <= 10; i++) { items.add("Item " + (page * 10 + i)); } return items; } }); }
private Processor<V, V> attachSource(K key) { _writeLock.lock(); try { // if our source is being attached, we expect that all existing sources have been // cleaned up properly. If not, this is a serious issue assert(!_weakSources.containsKey(key)); Processor<V, V> value = BehaviorProcessor.create(); WeakReference<Flowable<V>> weakConnector = _weakCache.get(key); // if an observable is being attached then it must have been added to the weak cache // and it must still be referenced Flowable<V> connector = weakConnector.get(); // the observable must be retained by someone since it is being attached assert(connector != null); // strongly retain the observable and add the subject so future next // calls will be piped through the subject _weakSources.put(key, new WeakReference<>(value)); _cache.put(key, connector); return value; } finally { _writeLock.unlock(); } }
public static void main(String[] args) { // TODO Auto-generated method stub CompositeDisposable disposable = new CompositeDisposable(); disposable.add(Flowable.rangeLong(10, 5).subscribe(System.out::println)); disposable.add(Flowable.rangeLong(1, 5).subscribe(item -> System.out.println("two" + item))); disposable.add(Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // TODO Auto-generated method stub try { String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov", "Dec" }; List<String> months = Arrays.asList(monthArray); for (String month : months) { emitter.onNext(month); } emitter.onComplete(); } catch (Exception e) { // TODO: handle exception emitter.onError(e); } } }).subscribe(s -> System.out.println(s))); disposable.dispose(); }
private static void checkReturnType(Method method1, Method method2) { Class<?> returnType; Type returnType1, returnType2; if (ModuleCall.class.equals(method1.getReturnType())) { // 异步回调的方法 returnType = method2.getReturnType(); if (returnType.equals(Observable.class) || returnType.equals(Single.class) || returnType.equals(Flowable.class) || returnType.equals(Maybe.class)) { returnType1 = method1.getGenericReturnType(); returnType2 = method2.getGenericReturnType(); if (returnType1 instanceof ParameterizedType && returnType2 instanceof ParameterizedType) { // 都带泛型 // 检查泛型的类型是否一样 if (!((ParameterizedType) returnType1).getActualTypeArguments()[0].equals(((ParameterizedType) returnType2).getActualTypeArguments()[0])) { throw new IllegalArgumentException(method1.getName() + "方法的返回值类型的泛型的须一样"); } } else if (!(returnType1 instanceof Class && returnType2 instanceof Class)) { throw new IllegalArgumentException(method1.getName() + "方法的返回值类型的泛型的须一样"); } } else { throw new IllegalArgumentException(String.format("%s::%s的返回值类型必须是Observable,Single,Flowable,Maybe之一", method2.getDeclaringClass().getSimpleName(), method2.getName())); } } else { if (!method1.getGenericReturnType().equals(method2.getGenericReturnType())) { //同步调用的返回值必须一样 throw new IllegalArgumentException(method1.getName() + "方法的返回值类型不一样"); } } }
public Flowable<ChangeSet> observeChangesOnCalendar() { return Flowable.create((FlowableEmitter<ChangeSet> emitter) -> { CalendarObjectChangeListener objectChangeListener = emitter::onNext; addChangeListener(objectChangeListener); emitter.setCancellable(() -> removeChangeListener(objectChangeListener)); }, BackpressureStrategy.BUFFER); }
@Test public void noStreamIdEmittedIfNoStreamIsProvided() { TestSubscriber<StreamId<?>> subscriber = new TestSubscriber<>(); Flowable.fromPublisher(newStreamHook()).subscribe(subscriber); subscriber.awaitTerminalEvent(1, SECONDS); subscriber.assertNoValues(); }
/** * Applies all revert events from a list and returns the list with only valid forward events. * * @param events The list of events * * @return An Flowable of forward only events */ @NotNull @Override public Flowable<EventT> applyReverts(@NotNull Flowable<EventT> events) { return events.toList().toFlowable().flatMap(eventList -> { log.debug(" Event Ids (includes reverts that won't be applied): {}", ids(eventList)); List<EventT> forwardEvents = new ArrayList<>(); while (!eventList.isEmpty()) { EventT head = eventList.remove(eventList.size() - 1); if (head instanceof RevertEvent) { final EventIdT revertedEventId = (EventIdT) ((RevertEvent) head).getRevertedEventId(); final Optional<EventT> revertedEvent = eventList.stream() .filter(it -> Objects.equals(it.getId(), revertedEventId)) .findFirst(); if (revertedEvent.isPresent()) { eventList.remove(revertedEvent.get()); } else { throw new GroovesException(String.format( "Cannot revert event that does not exist in unapplied list - %s", String.valueOf(revertedEventId))); } } else { forwardEvents.add(0, head); } } assert forwardEvents.stream().noneMatch(it -> it instanceof RevertEvent); return fromIterable(forwardEvents); }); }
public Flowable<Resource<Pageable<Repo>>> searchRemote(String query, int page) { return RestHelper.createRemoteSourceMapper(searchService.searchRepositories(query, page), repoPageable -> { dao.addAllAsync(repoPageable.getItems()); }); }
public final Flowable<Optional<List<T>>> getAll(final Filter filter) { return getAll(filter, SortingMode.DEFAULT); }
@Override public Flowable<ZhihuNewsEntity> getZhihuNewsList() { return mZhihuService.getZhihuNewsList() .compose(RxTransformerUtil.normalTransformer()); }
/** * @return int Number of items deleted */ public Flowable<Integer> delete(final List<T> items) { throw new UnsupportedOperationException("This method has not been implemented in the child class"); }
@GET("user/{email}") Flowable<User> getUser(@Path("email") String email);
public Flowable<T> createTask(T task) { return Flowable.interval(0, task.repeatIntervalMillis(), MILLISECONDS) // prevent configurations from running too often .onBackpressureDrop() .map(i -> task) // stop if shutting down .takeWhile(t -> !shutdownHook.isShutdown()) // stop after the first task if only running once .takeUntil(ignored -> runOnce) // remove subscriptions that are disposed or completed .doFinally(() -> ACTIVE_SUBSCRIPTIONS.remove(task)) // lifecycle management .doOnSubscribe(subscription -> Optional.ofNullable(ACTIVE_SUBSCRIPTIONS.put(task, subscription)).ifPresent(Subscription::cancel)) .doOnNext(disposableTask -> HAS_STARTED_PROCESSING.countDown()) .delay((item) -> { // get last execution time Instant lastRun = LAST_EXECUTED.computeIfAbsent(task, // or trigger a run by creating an Instant at the point in time where the configuration should have run last t -> Instant.now().minusMillis(task.repeatIntervalMillis())); // compute duration between lastRun and now; D=lastRun-lastRun Duration duration = Duration.between(Instant.now(), lastRun) // substract the repeat interval; D=repeatInterval-(lastRun-now) .plusMillis(task.repeatIntervalMillis()); // D<=0; need to run now if (duration.isNegative() || duration.isZero()) { return Flowable.just(0L); // D>0; need to run in D } else { return Flowable.timer(duration.toMillis(), TimeUnit.MILLISECONDS); } }) // mark the time at which we ran last .doOnNext(results -> LAST_EXECUTED.put(task, Instant.now())); }
/** * A Transformer, given eventObservable returns UIModels by applying the redux pattern. * * @return {@link FlowableTransformer} the Redux pattern transformer. */ @NonNull public Flowable<UIModel<S>> uiModels(S initialState) { return eventsSubject.toFlowable(BackpressureStrategy.BUFFER) .compose(uiModelsTransformer(initialState)); }
@POST("categories/{username}/{category}/feeds") Flowable<Feed> newFeed( @Path("username") String username, @Path("category") String category, @Body Feed feed);