Java 类io.netty.util.concurrent.EventExecutor 实例源码

项目:nettyRpc    文件:RpcClient.java   
public static void main(String[] args) {
    //创建客户端连接池
    ClientConnectionPool clientConnectionPool = createConnectionPool();
    //创建异步调用回调执行器
    EventExecutor eventExecutor = createEventExecutor();
    //创建同步客户端
    IRobotProtocol robotProtocol = RpcClient.createService("RobotService", IRobotProtocol.class, clientConnectionPool, eventExecutor, true);
    for(int i = 0; i < 10000; i++) {
        String msg = robotProtocol.sendMsg("hello world! request: " + i);
        System.out.println("get response: " + msg);
    }

    clientConnectionPool.close();
    eventExecutor.shutdownGracefully();
    //TODO 创建异步客户端,添加回调!

}
项目:hekate    文件:NettyServerClient.java   
private EventLoop mapToThread(int affinity, HandlerRegistration handler) {
    EventLoopGroup group;

    // Check if a dedicated thread pool is defined for this protocol.
    if (handler.config().getEventLoop() == null) {
        // Use core thread pool.
        group = coreEventLoopGroup;
    } else {
        // Use dedicated thread pool.
        group = handler.config().getEventLoop();
    }

    List<EventLoop> eventLoops = new ArrayList<>();

    // Assumes that the same group always returns its event loops in the same order.
    for (Iterator<EventExecutor> it = group.iterator(); it.hasNext(); ) {
        eventLoops.add((EventLoop)it.next());
    }

    return eventLoops.get(Utils.mod(affinity, eventLoops.size()));
}
项目:mpush    文件:Utils.java   
public static Map<String, Object> getPoolInfo(EventLoopGroup executors) {
    Map<String, Object> info = new HashMap<>(3);
    int poolSize = 0, queueSize = 0, activeCount = 0;
    for (EventExecutor e : executors) {
        poolSize++;
        if (e instanceof SingleThreadEventLoop) {
            SingleThreadEventLoop executor = (SingleThreadEventLoop) e;
            queueSize += executor.pendingTasks();
            ThreadProperties tp = executor.threadProperties();
            if (tp.state() == Thread.State.RUNNABLE) {
                activeCount++;
            }
        }
    }
    info.put("poolSize(workThread)", poolSize);
    info.put("activeCount(workingThread)", activeCount);
    info.put("queueSize(blockedTask)", queueSize);
    return info;
}
项目:riposte    文件:ProxyRouterEndpointExecutionHandler.java   
@Override
public void unrecoverableErrorOccurred(Throwable error, boolean guaranteesBrokenDownstreamResponse) {
    // Cancel request streaming so it stops trying to send data downstream and releases any chunks we've been
    //      holding onto. This holds true no matter the value of guaranteesBrokenDownstreamResponse
    //      (i.e. we want to stop sending data downstream no matter what). Note that this does not stop the
    //      downstream call's response, and that is intentional to support use cases where the downstream
    //      system can still successfully send a full response even though the request wasn't fully sent.
    proxyRouterProcessingState.cancelRequestStreaming(error, ctx);

    setDownstreamCallTimeOnRequestAttributesIfNotAlreadyDone();

    EventExecutor executor = ctx.executor();
    if (executor.inEventLoop()) {
        sendUnrecoverableErrorDownPipeline(error, guaranteesBrokenDownstreamResponse);
    }
    else {
        executor.execute(() -> sendUnrecoverableErrorDownPipeline(error, guaranteesBrokenDownstreamResponse));
    }
}
项目:jim    文件:AutoFlushHandler.java   
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
        case 1:
        case 2:
            return;
    }

    state = 1;

    EventExecutor loop = ctx.executor();

    lastWriteTime = System.nanoTime();
    writerIdleTimeout = loop.schedule(
            new WriterIdleTimeoutTask(ctx),
            writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
项目:drill    文件:WebSessionResourcesTest.java   
/**
 * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and other parameters.
 * @throws Exception
 */
@Test
public void testChannelPromiseWithValidExecutor() throws Exception {
  try {
    EventExecutor mockExecutor = mock(EventExecutor.class);
    ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor);
    webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
        (UserSession.class), closeFuture);
    webSessionResources.close();
    verify(webSessionResources.getAllocator()).close();
    verify(webSessionResources.getSession()).close();
    verify(mockExecutor).inEventLoop();
    verify(mockExecutor).execute(any(Runnable.class));
    assertTrue(webSessionResources.getCloseFuture() == null);
    assertTrue(!listenerComplete);
  } catch (Exception e) {
    fail();
  }
}
项目:drill    文件:WebSessionResourcesTest.java   
/**
 * Validates double call to {@link WebSessionResources#close()} doesn't throw any exception.
 * @throws Exception
 */
@Test
public void testDoubleClose() throws Exception {
  try {
    ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class));
    webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
        (UserSession.class), closeFuture);
    webSessionResources.close();

    verify(webSessionResources.getAllocator()).close();
    verify(webSessionResources.getSession()).close();
    assertTrue(webSessionResources.getCloseFuture() == null);

    webSessionResources.close();
  } catch (Exception e) {
    fail();
  }
}
项目:netty4.0.27Learn    文件:SslHandler.java   
/**
 * Performs TLS renegotiation.
 */
public Future<Channel> renegotiate(final Promise<Channel> promise) {
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    ChannelHandlerContext ctx = this.ctx;
    if (ctx == null) {
        throw new IllegalStateException();
    }

    EventExecutor executor = ctx.executor();
    if (!executor.inEventLoop()) {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                handshake(promise);
            }
        });
        return promise;
    }

    handshake(promise);
    return promise;
}
项目:nedis    文件:PromiseConverter.java   
public static PromiseConverter<Long> toLong(EventExecutor executor) {
    return new PromiseConverter<Long>(executor) {

        @Override
        public FutureListener<Object> newListener(final Promise<Long> promise) {
            return new FutureListener<Object>() {

                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (future.isSuccess()) {
                        Object resp = future.getNow();
                        if (resp instanceof RedisResponseException) {
                            promise.tryFailure((RedisResponseException) resp);
                        } else if (resp == RedisResponseDecoder.NULL_REPLY) {
                            promise.trySuccess(null);
                        } else {
                            promise.trySuccess((Long) resp);
                        }
                    } else {
                        promise.tryFailure(future.cause());
                    }
                }
            };
        }
    };
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext fireChannelRegistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext fireChannelUnregistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelUnregistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelUnregistered();
            }
        });
    }
    return this;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
    return this;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext fireChannelInactive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelInactive();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelInactive();
            }
        });
    }
    return this;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
    if (event == null) {
        throw new NullPointerException("event");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeUserEventTriggered(event);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeUserEventTriggered(event);
            }
        });
    }
    return this;
}
项目:nedis    文件:PromiseConverter.java   
public static PromiseConverter<byte[]> toBytes(EventExecutor executor) {
    return new PromiseConverter<byte[]>(executor) {

        @Override
        public FutureListener<Object> newListener(final Promise<byte[]> promise) {
            return new FutureListener<Object>() {

                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (future.isSuccess()) {
                        Object resp = future.getNow();
                        if (resp instanceof RedisResponseException) {
                            promise.tryFailure((RedisResponseException) resp);
                        } else if (resp == RedisResponseDecoder.NULL_REPLY) {
                            promise.trySuccess(null);
                        } else {
                            promise.trySuccess((byte[]) resp);
                        }
                    } else {
                        promise.tryFailure(future.cause());
                    }
                }
            };
        }
    };
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelWritabilityChanged();
    } else {
        Runnable task = next.invokeChannelWritableStateChangedTask;
        if (task == null) {
            next.invokeChannelWritableStateChangedTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelWritabilityChanged();
                }
            };
        }
        executor.execute(task);
    }
    return this;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new OneTimeTask() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }

    return promise;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelFuture close(final ChannelPromise promise) {
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeClose(promise);
    } else {
        safeExecute(executor, new OneTimeTask() {
            @Override
            public void run() {
                next.invokeClose(promise);
            }
        }, promise, null);
    }

    return promise;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelFuture deregister(final ChannelPromise promise) {
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeDeregister(promise);
    } else {
        safeExecute(executor, new OneTimeTask() {
            @Override
            public void run() {
                next.invokeDeregister(promise);
            }
        }, promise, null);
    }

    return promise;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext read() {
    invokedPrevRead = true;
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }
    return this;
}
项目:netty4.0.27Learn    文件:AbstractChannelHandlerContext.java   
@Override
public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeFlush();
    } else {
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel.voidPromise(), null);
    }

    return this;
}
项目:netty4.0.27Learn    文件:DefaultChannelPipeline.java   
private void destroyUp(AbstractChannelHandlerContext ctx) {
    final Thread currentThread = Thread.currentThread();
    final AbstractChannelHandlerContext tail = this.tail;
    for (;;) {
        if (ctx == tail) {
            destroyDown(currentThread, tail.prev);
            break;
        }

        final EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop(currentThread)) {
            final AbstractChannelHandlerContext finalCtx = ctx;
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    destroyUp(finalCtx);
                }
            });
            break;
        }

        ctx = ctx.next;
    }
}
项目:netty4.0.27Learn    文件:JZlibEncoder.java   
@Override
public ChannelFuture close(final ChannelPromise promise) {
    ChannelHandlerContext ctx = ctx();
    EventExecutor executor = ctx.executor();
    if (executor.inEventLoop()) {
        return finishEncode(ctx, promise);
    } else {
        final ChannelPromise p = ctx.newPromise();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                ChannelFuture f = finishEncode(ctx(), p);
                f.addListener(new ChannelPromiseNotifier(promise));
            }
        });
        return p;
    }
}
项目:netty4.0.27Learn    文件:JdkZlibEncoder.java   
@Override
public ChannelFuture close(final ChannelPromise promise) {
    ChannelHandlerContext ctx = ctx();
    EventExecutor executor = ctx.executor();
    if (executor.inEventLoop()) {
        return finishEncode(ctx, promise);
    } else {
        final ChannelPromise p = ctx.newPromise();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                ChannelFuture f = finishEncode(ctx(), p);
                f.addListener(new ChannelPromiseNotifier(promise));
            }
        });
        return p;
    }
}
项目:nedis    文件:PromiseConverter.java   
public static PromiseConverter<Double> toDouble(EventExecutor executor) {
    return new PromiseConverter<Double>(executor) {

        @Override
        public FutureListener<Object> newListener(final Promise<Double> promise) {
            return new FutureListener<Object>() {

                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (future.isSuccess()) {
                        Object resp = future.getNow();
                        if (resp instanceof RedisResponseException) {
                            promise.tryFailure((RedisResponseException) resp);
                        } else if (resp == RedisResponseDecoder.NULL_REPLY) {
                            promise.trySuccess(null);
                        } else {
                            promise.trySuccess(bytesToDouble((byte[]) resp));
                        }
                    } else {
                        promise.tryFailure(future.cause());
                    }
                }
            };
        }
    };
}
项目:nedis    文件:PromiseConverter.java   
public static PromiseConverter<Object> toObject(EventExecutor executor) {
    return new PromiseConverter<Object>(executor) {

        @Override
        public FutureListener<Object> newListener(final Promise<Object> promise) {
            return new FutureListener<Object>() {

                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (future.isSuccess()) {
                        Object resp = future.getNow();
                        if (resp instanceof RedisResponseException) {
                            promise.tryFailure((RedisResponseException) resp);
                        } else if (resp == RedisResponseDecoder.NULL_REPLY) {
                            promise.trySuccess(null);
                        } else {
                            promise.trySuccess(resp);
                        }
                    } else {
                        promise.tryFailure(future.cause());
                    }
                }
            };
        }
    };
}
项目:armeria    文件:AbstractStreamMessageDuplicator.java   
/**
 * Creates a new instance wrapping a {@code publisher} and publishing to multiple subscribers.
 * @param publisher the publisher who will publish data to subscribers
 * @param signalLengthGetter the signal length getter that produces the length of signals
 * @param executor the executor to use for upstream signals
 * @param maxSignalLength the maximum length of signals. {@code 0} disables the length limit
 */
protected AbstractStreamMessageDuplicator(
        U publisher, SignalLengthGetter<? super T> signalLengthGetter,
        @Nullable EventExecutor executor, long maxSignalLength) {
    requireNonNull(publisher, "publisher");
    requireNonNull(signalLengthGetter, "signalLengthGetter");
    checkArgument(maxSignalLength >= 0,
                  "maxSignalLength: %s (expected: >= 0)", maxSignalLength);
    if (executor != null) {
        duplicatorExecutor = executor;
    } else {
        duplicatorExecutor = RequestContext.mapCurrent(
                RequestContext::eventLoop, () -> CommonPools.workerGroup().next());
    }

    processor = new StreamMessageProcessor<>(publisher, signalLengthGetter,
                                             duplicatorExecutor, maxSignalLength);
}
项目:nedis    文件:PromiseConverter.java   
public static PromiseConverter<String> toString(EventExecutor executor) {
    return new PromiseConverter<String>(executor) {

        @Override
        public FutureListener<Object> newListener(final Promise<String> promise) {
            return new FutureListener<Object>() {

                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (future.isSuccess()) {
                        Object resp = future.getNow();
                        if (resp instanceof RedisResponseException) {
                            promise.tryFailure((RedisResponseException) resp);
                        } else if (resp == RedisResponseDecoder.NULL_REPLY) {
                            promise.trySuccess(null);
                        } else {
                            promise.trySuccess(resp.toString());
                        }
                    } else {
                        promise.tryFailure(future.cause());
                    }
                }
            };
        }
    };
}
项目:drift    文件:ThriftClientHandler.java   
void registerRequestTimeout(EventExecutor executor)
{
    try {
        timeout.set(executor.schedule(
                () -> onChannelError(new TTransportException("Timed out waiting " + requestTimeout + " to receive response")),
                requestTimeout.toMillis(),
                MILLISECONDS));
    }
    catch (Throwable throwable) {
        onChannelError(new TTransportException("Unable to schedule request timeout", throwable));
        throw throwable;
    }
}
项目:centraldogma    文件:Polyfills.java   
public static Future<?> setTimeout(EventExecutor executor, Runnable task, long delay) {
    if (delay <= 0) {
        return executor.submit(task);
    }

    return executor.schedule(task, delay, TimeUnit.MILLISECONDS);
}
项目:nettyRpc    文件:BenchmarkClient.java   
public BenchmarkClient(int threadCount, int requestCount){
    this.threadCount = threadCount;
    this.requestCount = requestCount;
    this.stop = new CountDownLatch(threadCount);
    //创建客户端连接池
    ClientConnectionPool clientConnectionPool = RpcClient.createConnectionPool();
    //创建异步调用回调执行器
    EventExecutor eventExecutor = RpcClient.createEventExecutor();
    //创建同步客户端
    robotProtocol = RpcClient.createService("RobotService", IRobotProtocol.class, clientConnectionPool, eventExecutor, true);
}
项目:hashsdn-controller    文件:AutoCloseableEventExecutor.java   
public static AutoCloseableEventExecutor forwardingEventExecutor(final EventExecutor eventExecutor,
        final AutoCloseable closeable) {
    return createCloseableProxy(new CloseableEventExecutorMixin(eventExecutor) {
        @Override
        public void close() throws Exception {
            // Intentional no-op.
            closeable.close();
        }
    });
}
项目:hashsdn-controller    文件:GlobalEventExecutorModule.java   
@Override
public AutoCloseable createInstance() {
    // The service is provided via blueprint so wait for and return it here for backwards compatibility.
    final WaitingServiceTracker<EventExecutor> tracker = WaitingServiceTracker.create(
            EventExecutor.class, bundleContext, "(type=global-event-executor)");
    EventExecutor eventExecutor = tracker.waitForService(WaitingServiceTracker.FIVE_MINUTES);
    return CloseableEventExecutorMixin.forwardingEventExecutor(eventExecutor, tracker);
}
项目:hashsdn-controller    文件:GlobalEventExecutorModuleTest.java   
@SuppressWarnings({ "rawtypes", "unchecked" })
@Before
public void setUp() throws Exception {
    factory = new GlobalEventExecutorModuleFactory();
    super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext,factory));

    Filter mockFilter = mock(Filter.class);
    doReturn("mock").when(mockFilter).toString();
    doReturn(mockFilter).when(mockedContext).createFilter(anyString());
    doNothing().when(mockedContext).addServiceListener(any(ServiceListener.class), anyString());
    ServiceReference mockServiceRef = mock(ServiceReference.class);
    doReturn(new ServiceReference[]{mockServiceRef}).when(mockedContext).
            getServiceReferences(anyString(), anyString());
    doReturn(mock(EventExecutor.class)).when(mockedContext).getService(mockServiceRef);
}
项目:riposte    文件:NonblockingEndpointExecutionHandler.java   
protected void asyncCallback(ChannelHandlerContext ctx, ResponseInfo<?> responseInfo) {
    HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get();

    if (responseInfo.isChunkedResponse()) {
        // Whoops, chunked responses are not allowed for this endpoint type.
        asyncErrorCallback(
            ctx,
            new Exception("NonblockingEndpoint execution resulted in a chunked ResponseInfo, when only full "
                          + "ResponseInfos are allowed. offending_endpoint_class=" +
                          state.getEndpointForExecution().getClass().getName())
        );
    }
    else {
        executeOnlyIfChannelIsActive(
            ctx, "NonblockingEndpointExecutionHandler-asyncCallback",
            () -> {
                // We have to set the ResponseInfo on the state and fire the event while in the
                //      channel's EventLoop. Otherwise there could be a race condition with an error
                //      that was fired down the pipe that sets the ResponseInfo on the state first, then
                //      this comes along and replaces the ResponseInfo (or vice versa).
                EventExecutor executor = ctx.executor();
                if (executor.inEventLoop()) {
                    setResponseInfoAndActivatePipelineForResponse(state, responseInfo, ctx);
                }
                else {
                    executor.execute(() -> setResponseInfoAndActivatePipelineForResponse(state, responseInfo, ctx));
                }
            }
        );
    }
}
项目:riposte    文件:NonblockingEndpointExecutionHandlerTest.java   
@Before
public void beforeMethod() {
    stateMock = mock(HttpProcessingState.class);
    proxyRouterStateMock = mock(ProxyRouterProcessingState.class);
    ctxMock = mock(ChannelHandlerContext.class);
    channelMock = mock(Channel.class);
    stateAttrMock = mock(Attribute.class);
    proxyRouterStateAttrMock = mock(Attribute.class);
    requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests();
    endpointMock = mock(StandardEndpoint.class);
    longRunningTaskExecutorMock = mock(Executor.class);
    responseFuture = new CompletableFuture<>();
    stateWorkChainFutureSpy = spy(CompletableFuture.completedFuture(null));
    eventLoopMock = mock(EventLoop.class);
    eventExecutorMock = mock(EventExecutor.class);

    doReturn(channelMock).when(ctxMock).channel();
    doReturn(stateAttrMock).when(channelMock).attr(ChannelAttributes.HTTP_PROCESSING_STATE_ATTRIBUTE_KEY);
    doReturn(stateMock).when(stateAttrMock).get();
    doReturn(false).when(stateMock).isRequestHandled();
    doReturn(proxyRouterStateAttrMock).when(channelMock).attr(ChannelAttributes.PROXY_ROUTER_PROCESSING_STATE_ATTRIBUTE_KEY);
    doReturn(proxyRouterStateMock).when(proxyRouterStateAttrMock).get();
    doReturn(endpointMock).when(stateMock).getEndpointForExecution();
    doReturn(requestInfo).when(stateMock).getRequestInfo();
    doReturn(responseFuture).when(endpointMock).execute(any(RequestInfo.class), any(Executor.class), any(ChannelHandlerContext.class));
    doReturn(eventLoopMock).when(channelMock).eventLoop();
    doReturn(eventExecutorMock).when(ctxMock).executor();
    doReturn(true).when(eventExecutorMock).inEventLoop();
    doReturn(true).when(channelMock).isActive();
    doAnswer(invocation -> {
        CompletableFuture actualFutureForAttaching = (CompletableFuture) invocation.callRealMethod();
        futureThatWillBeAttachedToSpy = spy(actualFutureForAttaching);
        return futureThatWillBeAttachedToSpy;
    }).when(stateWorkChainFutureSpy).thenCompose(any(Function.class));
    doReturn(stateWorkChainFutureSpy).when(stateMock).getPreEndpointExecutionWorkChain();

    handlerSpy = spy(new NonblockingEndpointExecutionHandler(longRunningTaskExecutorMock, defaultCompletableFutureTimeoutMillis));
}
项目:megaphone    文件:NettyReactiveStreamsBody.java   
@Override
protected void complete() {
    EventExecutor executor = channel.eventLoop();
    executor.execute(new Runnable() {
        @Override
        public void run() {
            channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    removeFromPipeline();
                }
            });
        }
    });
}
项目:reactor-netty    文件:ColocatedEventLoopGroup.java   
ColocatedEventLoopGroup(EventLoopGroup eventLoopGroup) {
    this.eventLoopGroup = eventLoopGroup;
    for (EventExecutor ex : eventLoopGroup) {
        if (ex instanceof EventLoop) {
            ex.submit(() -> {
                if (!localLoop.isSet()) {
                    localLoop.set((EventLoop) ex);
                }
            });
        }
    }
}
项目:netty4.0.27Learn    文件:IdleStateHandler.java   
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }

    state = 1;

    EventExecutor loop = ctx.executor();

    lastReadTime = lastWriteTime = System.nanoTime();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = loop.schedule(
                new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = loop.schedule(
                new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = loop.schedule(
                new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}
项目:netty4.0.27Learn    文件:SslHandler.java   
@Override
protected EventExecutor executor() {
    if (ctx == null) {
        throw new IllegalStateException();
    }
    return ctx.executor();
}