@ActivityScope @Provides public Function<Flowable<Tweet>, Flowable<Tweet>> provideBackPressureStrategy() { Function<Flowable<Tweet>, Flowable<Tweet>> backPressureStrategyFunction; switch (backPressureStrategy) { case BUFFER: backPressureStrategyFunction = flowable -> flowable.onBackpressureBuffer(50, () -> Log.d("", "Buffer full, dropping tweets"), BackpressureOverflowStrategy.DROP_OLDEST); break; case DROP: backPressureStrategyFunction = flowable -> flowable.onBackpressureDrop(); break; case LATEST: backPressureStrategyFunction = flowable -> flowable.onBackpressureLatest(); break; default: backPressureStrategyFunction = flowable -> flowable; break; } return backPressureStrategyFunction; }
private <T> Flowable<T> applyBackpressureStrategy(Flowable<T> source, BackpressureStrategy backpressureStrategy) { if(backpressureStrategy == null){ return source; } if (backpressureStrategy instanceof BackpressureLatestStrategy) { return source.onBackpressureLatest(); } if (backpressureStrategy instanceof BackpressureDropStrategy) { return source.onBackpressureDrop(i -> {}); } if (backpressureStrategy instanceof BackpressureBufferStrategy) { BackpressureBufferStrategy bufferStrategy = (BackpressureBufferStrategy) backpressureStrategy; if (bufferStrategy.overflowStrategy() == BackpressureBufferStrategy.BackpressureBufferOverflowStrategy.DROP_LATEST) { return source.onBackpressureBuffer(bufferStrategy.bufferSize(), NOOP, BackpressureOverflowStrategy.DROP_LATEST); } if (bufferStrategy.overflowStrategy() == BackpressureBufferStrategy.BackpressureBufferOverflowStrategy.DROP_OLDEST) { return source.onBackpressureBuffer(bufferStrategy.bufferSize(), NOOP, BackpressureOverflowStrategy.DROP_OLDEST); } throw new IllegalArgumentException("Cannot determine the specified buffer overflow strategy: " + bufferStrategy); } if (backpressureStrategy instanceof BackpressureNoneStrategy) { return source; } throw new IllegalArgumentException("Cannot determine the specified backpressure strategy: " + backpressureStrategy); }
private Flowable<SendingTask<M>> initBackpressurePolicy(Flowable<SendingTask<M>> flowable) { Strategy strategy = this.overflowStrategy; if (strategy == Strategy.DropNew) { return flowable.onBackpressureDrop(new Consumer<SendingTask<M>>() { @Override public void accept(SendingTask<M> task) throws Exception { metricsCallback(1); } }); } else { BackpressureOverflowStrategy rxStrategy = RxOverflowStrategyBridge.toRxStrategy(strategy); return flowable.onBackpressureBuffer(pendingMaxMessages, new Action() { @Override public void run() throws Exception { metricsCallback(1); } }, rxStrategy); } }
public static void main(String[] args) { Flowable.interval(1, TimeUnit.MILLISECONDS) .onBackpressureBuffer(10, () -> System.out.println("overflow!"), BackpressureOverflowStrategy.DROP_LATEST) .observeOn(Schedulers.io()) .subscribe(i -> { sleep(5); System.out.println(i); }); sleep(5000); }
/** * Convert a {@link OverflowStrategy.Strategy} to rx-java's {@link BackpressureOverflowStrategy} */ static BackpressureOverflowStrategy toRxStrategy(OverflowStrategy.Strategy strategy) { switch (strategy) { case Fail: return BackpressureOverflowStrategy.ERROR; case DropTail: return BackpressureOverflowStrategy.DROP_LATEST; case DropHead: return BackpressureOverflowStrategy.DROP_OLDEST; default: throw new UnsupportedOperationException(strategy + " not supported using rx-java."); } }
public <T> ErrorStreamPair<T> stream(Publisher<T> dataPublisher) { return ErrorStreamPair.ofDataError(dataPublisher, errorStream.toSerialized().onBackpressureBuffer(Flowable.bufferSize(), () -> LOGGER.error("Discarding exception due to backpressure buffer limit"), BackpressureOverflowStrategy.DROP_OLDEST)); }