public static void main(String[] args) { MathFlowable.averageDouble(Flowable.range(10, 9)).subscribe(new FlowableSubscriber() { @Override public void onComplete() { // TODO Auto-generated method stub System.out.println("completed successfully"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub } @Override public void onNext(Object value) { // TODO Auto-generated method stub System.out.println("average:-" + value); } @Override public void onSubscribe(Subscription subscription) { // TODO Auto-generated method stub subscription.request(1); } }); }
private void flowable() { Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception { Log.e(TAG, "start send data "); for (int i = 0; i < 100; i++) { e.onNext(i); } e.onComplete(); } }, BackpressureStrategy.DROP)//指定背压策略 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber<Integer>() { @Override public void onSubscribe(@NonNull Subscription s) { //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法 //2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求 //3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。 Log.e(TAG, "onSubscribe..."); s.request(10); } @Override public void onNext(Integer integer) { Log.e(TAG, "onNext:" + integer); } @Override public void onError(Throwable t) { Log.e(TAG, "onError..." + t); } @Override public void onComplete() { Log.e(TAG, "onComplete..."); } }); }
/** * @param flowable * @param doWithRegistrySpec * @param <T> * @return * @see RxRatpack#forkEach(Observable, Action) */ public static <T> Flowable<T> forkEach(Flowable<T> flowable, Action<? super RegistrySpec> doWithRegistrySpec) { return flowable.lift(downstream -> new FlowableSubscriber<T>() { private final AtomicInteger wip = new AtomicInteger(1); private final AtomicBoolean closed = new AtomicBoolean(); private Subscription subscription; @Override public void onSubscribe(Subscription s) { this.subscription = s; s.request(1); downstream.onSubscribe(s); } @Override public void onComplete() { maybeDone(); } @Override public void onError(final Throwable e) { terminate(() -> downstream.onError(e)); } private void maybeDone() { if (wip.decrementAndGet() == 0) { terminate(downstream::onComplete); } } private void terminate(Runnable runnable) { if (closed.compareAndSet(false, true)) { subscription.cancel(); runnable.run(); } } @Override public void onNext(final T t) { // Avoid the overhead of creating executions if downstream is no longer interested if (closed.get()) { return; } wip.incrementAndGet(); Execution.fork() .register(doWithRegistrySpec) .onComplete(e -> this.maybeDone()) .onError(this::onError) .start(e -> { if (!closed.get()) { subscription.request(1); downstream.onNext(t); } }); } }); }
@OnClick(R.id.go) public void go() { Editable url = urlView.getText(); if (TextUtils.isEmpty(url)) { Toast.makeText(this, "url is empty", Toast.LENGTH_SHORT); return; } RxEasyHttp.get(url.toString(), new RxEasyStringConverter()) .doOnSubscribe(new Consumer<Subscription>() { @Override public void accept(@NonNull Subscription subscription) throws Exception { dialog.show(); body.setText(""); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber<String>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); dialog.show(); body.setText(""); } @Override public void onNext(String response) { body.setText(response); } @Override public void onError(Throwable t) { body.setText(t.toString()); } @Override public void onComplete() { dialog.cancel(); } }); }
@OnClick(R.id.submit) public void submit() { Editable content = comment.getText(); if (TextUtils.isEmpty(content)) { Toast.makeText(this, "comment is empty", Toast.LENGTH_SHORT); return; } EasyRequestParams params = new EasyRequestParams(); params.put("content", content.toString()); String url = "http://book.km.com/app/index.php?c=version&a=feedback"; RxEasyHttp.post(url, params, new RxEasyCustomConverter<PostEntity>() { @Override public void doNothing() { // 防止范型类型擦除引起范型类型不能正确获取问题. } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new FlowableSubscriber<PostEntity>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); dialog.show(); } @Override public void onNext(PostEntity entity) { Toast.makeText(RxPostActivity.this, "提交成功", Toast.LENGTH_LONG).show(); result.setText("status : " + entity.getStatus() + "\n" + "message : " + entity.getMessage()); } @Override public void onError(Throwable t) { Toast.makeText(RxPostActivity.this, "提交失败", Toast.LENGTH_LONG).show(); result.setText(t.getMessage()); dialog.cancel(); } @Override public void onComplete() { dialog.cancel(); } }); }