@SuppressWarnings({"unchecked", "rawtypes"}) @Override public ProgressivePromise<T> addListeners( GenericFutureListener<? extends Future<? super T>>... listeners) { return delegate.addListeners(Stream.of(listeners) .map(context::makeContextAware) .toArray(GenericFutureListener[]::new)); }
/** * Create a new {@link ProgressivePromise}. */ public <V> ProgressivePromise<V> newProgressivePromise() { return next().newProgressivePromise(); }
@Override public <V> ProgressivePromise<V> newProgressivePromise() { return new RequestContextAwareProgressivePromise<>(context(), delegate().newProgressivePromise()); }
RequestContextAwareProgressivePromise(RequestContext context, ProgressivePromise<T> delegate) { this.context = context; this.delegate = delegate; }
@Override public ProgressivePromise<T> setProgress(long progress, long total) { return delegate.setProgress(progress, total); }
@Override public ProgressivePromise<T> setSuccess(T result) { return delegate.setSuccess(result); }
@Override public ProgressivePromise<T> setFailure(Throwable cause) { return delegate.setFailure(cause); }
@Override public ProgressivePromise<T> addListener( GenericFutureListener<? extends Future<? super T>> listener) { return delegate.addListener(context.makeContextAware(listener)); }
@Override public ProgressivePromise<T> removeListener( GenericFutureListener<? extends Future<? super T>> listener) { return delegate.removeListener(listener); }
@Override @SafeVarargs public final ProgressivePromise<T> removeListeners( GenericFutureListener<? extends Future<? super T>>... listeners) { return delegate.removeListeners(listeners); }
@Override public ProgressivePromise<T> sync() throws InterruptedException { return delegate.sync(); }
@Override public ProgressivePromise<T> syncUninterruptibly() { return delegate.syncUninterruptibly(); }
@Override public ProgressivePromise<T> await() throws InterruptedException { return delegate.await(); }
@Override public ProgressivePromise<T> awaitUninterruptibly() { return delegate.awaitUninterruptibly(); }
@Test public void contextAwareEventExecutor() throws Exception { when(channel.eventLoop()).thenReturn(eventLoop); RequestContext context = createContext(); Set<Integer> callbacksCalled = Collections.newSetFromMap(new ConcurrentHashMap<>()); EventExecutor executor = context.contextAwareEventLoop(); CountDownLatch latch = new CountDownLatch(18); executor.execute(() -> checkCallback(1, context, callbacksCalled, latch)); executor.schedule(() -> checkCallback(2, context, callbacksCalled, latch), 0, TimeUnit.SECONDS); executor.schedule(() -> { checkCallback(2, context, callbacksCalled, latch); return "success"; }, 0, TimeUnit.SECONDS); executor.scheduleAtFixedRate(() -> checkCallback(3, context, callbacksCalled, latch), 0, 1000, TimeUnit.SECONDS); executor.scheduleWithFixedDelay(() -> checkCallback(4, context, callbacksCalled, latch), 0, 1000, TimeUnit.SECONDS); executor.submit(() -> checkCallback(5, context, callbacksCalled, latch)); executor.submit(() -> checkCallback(6, context, callbacksCalled, latch), "success"); executor.submit(() -> { checkCallback(7, context, callbacksCalled, latch); return "success"; }); executor.invokeAll(makeTaskList(8, 10, context, callbacksCalled, latch)); executor.invokeAll(makeTaskList(11, 12, context, callbacksCalled, latch), 10000, TimeUnit.SECONDS); executor.invokeAny(makeTaskList(13, 13, context, callbacksCalled, latch)); executor.invokeAny(makeTaskList(14, 14, context, callbacksCalled, latch), 10000, TimeUnit.SECONDS); Promise<String> promise = executor.newPromise(); promise.addListener(f -> checkCallback(15, context, callbacksCalled, latch)); promise.setSuccess("success"); executor.newSucceededFuture("success") .addListener(f -> checkCallback(16, context, callbacksCalled, latch)); executor.newFailedFuture(new IllegalArgumentException()) .addListener(f -> checkCallback(17, context, callbacksCalled, latch)); ProgressivePromise<String> progressivePromise = executor.newProgressivePromise(); progressivePromise.addListener(f -> checkCallback(18, context, callbacksCalled, latch)); progressivePromise.setSuccess("success"); latch.await(); eventLoop.shutdownGracefully().sync(); assertThat(callbacksCalled).containsExactlyElementsOf(IntStream.rangeClosed(1, 18).boxed()::iterator); }