public static <T> FlowableOperator<T, T> doOnEmpty(Action action) { return new FlowableOperator<T, T>() { @Override public Subscriber<? super T> apply(Subscriber<? super T> subscriber) throws Exception { return new DisposableSubscriber<T>() { boolean isEmpty = true; @Override public void onNext(T value) { isEmpty = false; subscriber.onNext(value); } @Override public void onError(Throwable t) { subscriber.onError(t); } @Override public void onComplete() { if (isEmpty) { try { action.run(); } catch (Exception e) { onError(e); return; } } subscriber.onComplete(); } }; } }; }
public static <T> FlowableOperator<T, T> flowable() { return flowable(DEFAULT_CACHE); }
public static <T> FlowableOperator<T, T> flowable(int cacheSize) { return actual -> new RandomSubscriber<>(actual, cacheSize); }