Java 类io.reactivex.FlowableOperator 实例源码

项目:Learning-RxJava    文件:Ch9_9.java   
public static <T> FlowableOperator<T, T> doOnEmpty(Action
                                                           action) {
    return new FlowableOperator<T, T>() {
        @Override
        public Subscriber<? super T> apply(Subscriber<? super
                T> subscriber) throws Exception {
            return new DisposableSubscriber<T>() {
                boolean isEmpty = true;

                @Override
                public void onNext(T value) {
                    isEmpty = false;
                    subscriber.onNext(value);
                }

                @Override
                public void onError(Throwable t) {
                    subscriber.onError(t);
                }

                @Override
                public void onComplete() {
                    if (isEmpty) {
                        try {
                            action.run();
                        } catch (Exception e) {
                            onError(e);
                            return;
                        }
                    }
                    subscriber.onComplete();
                }
            };
        }
    };
}
项目:Java-EX    文件:RandomOperator.java   
public static <T> FlowableOperator<T, T> flowable() {
  return flowable(DEFAULT_CACHE);
}
项目:Java-EX    文件:RandomOperator.java   
public static <T> FlowableOperator<T, T> flowable(int cacheSize) {
  return actual -> new RandomSubscriber<>(actual, cacheSize);
}
项目:Java-EX    文件:RandomOperator.java   
public static <T> FlowableOperator<T, T> flowable() {
  return flowable(DEFAULT_CACHE);
}
项目:Java-EX    文件:RandomOperator.java   
public static <T> FlowableOperator<T, T> flowable(int cacheSize) {
  return actual -> new RandomSubscriber<>(actual, cacheSize);
}