public static void main(String[] args) { //创建客户端连接池 ClientConnectionPool clientConnectionPool = createConnectionPool(); //创建异步调用回调执行器 EventExecutor eventExecutor = createEventExecutor(); //创建同步客户端 IRobotProtocol robotProtocol = RpcClient.createService("RobotService", IRobotProtocol.class, clientConnectionPool, eventExecutor, true); for(int i = 0; i < 10000; i++) { String msg = robotProtocol.sendMsg("hello world! request: " + i); System.out.println("get response: " + msg); } clientConnectionPool.close(); eventExecutor.shutdownGracefully(); //TODO 创建异步客户端,添加回调! }
private EventLoop mapToThread(int affinity, HandlerRegistration handler) { EventLoopGroup group; // Check if a dedicated thread pool is defined for this protocol. if (handler.config().getEventLoop() == null) { // Use core thread pool. group = coreEventLoopGroup; } else { // Use dedicated thread pool. group = handler.config().getEventLoop(); } List<EventLoop> eventLoops = new ArrayList<>(); // Assumes that the same group always returns its event loops in the same order. for (Iterator<EventExecutor> it = group.iterator(); it.hasNext(); ) { eventLoops.add((EventLoop)it.next()); } return eventLoops.get(Utils.mod(affinity, eventLoops.size())); }
public static Map<String, Object> getPoolInfo(EventLoopGroup executors) { Map<String, Object> info = new HashMap<>(3); int poolSize = 0, queueSize = 0, activeCount = 0; for (EventExecutor e : executors) { poolSize++; if (e instanceof SingleThreadEventLoop) { SingleThreadEventLoop executor = (SingleThreadEventLoop) e; queueSize += executor.pendingTasks(); ThreadProperties tp = executor.threadProperties(); if (tp.state() == Thread.State.RUNNABLE) { activeCount++; } } } info.put("poolSize(workThread)", poolSize); info.put("activeCount(workingThread)", activeCount); info.put("queueSize(blockedTask)", queueSize); return info; }
@Override public void unrecoverableErrorOccurred(Throwable error, boolean guaranteesBrokenDownstreamResponse) { // Cancel request streaming so it stops trying to send data downstream and releases any chunks we've been // holding onto. This holds true no matter the value of guaranteesBrokenDownstreamResponse // (i.e. we want to stop sending data downstream no matter what). Note that this does not stop the // downstream call's response, and that is intentional to support use cases where the downstream // system can still successfully send a full response even though the request wasn't fully sent. proxyRouterProcessingState.cancelRequestStreaming(error, ctx); setDownstreamCallTimeOnRequestAttributesIfNotAlreadyDone(); EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { sendUnrecoverableErrorDownPipeline(error, guaranteesBrokenDownstreamResponse); } else { executor.execute(() -> sendUnrecoverableErrorDownPipeline(error, guaranteesBrokenDownstreamResponse)); } }
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; EventExecutor loop = ctx.executor(); lastWriteTime = System.nanoTime(); writerIdleTimeout = loop.schedule( new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); }
/** * Validates successful {@link WebSessionResources#close()} with valid CloseFuture and other parameters. * @throws Exception */ @Test public void testChannelPromiseWithValidExecutor() throws Exception { try { EventExecutor mockExecutor = mock(EventExecutor.class); ChannelPromise closeFuture = new DefaultChannelPromise(null, mockExecutor); webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock (UserSession.class), closeFuture); webSessionResources.close(); verify(webSessionResources.getAllocator()).close(); verify(webSessionResources.getSession()).close(); verify(mockExecutor).inEventLoop(); verify(mockExecutor).execute(any(Runnable.class)); assertTrue(webSessionResources.getCloseFuture() == null); assertTrue(!listenerComplete); } catch (Exception e) { fail(); } }
/** * Validates double call to {@link WebSessionResources#close()} doesn't throw any exception. * @throws Exception */ @Test public void testDoubleClose() throws Exception { try { ChannelPromise closeFuture = new DefaultChannelPromise(null, mock(EventExecutor.class)); webSessionResources = new WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock (UserSession.class), closeFuture); webSessionResources.close(); verify(webSessionResources.getAllocator()).close(); verify(webSessionResources.getSession()).close(); assertTrue(webSessionResources.getCloseFuture() == null); webSessionResources.close(); } catch (Exception e) { fail(); } }
/** * Performs TLS renegotiation. */ public Future<Channel> renegotiate(final Promise<Channel> promise) { if (promise == null) { throw new NullPointerException("promise"); } ChannelHandlerContext ctx = this.ctx; if (ctx == null) { throw new IllegalStateException(); } EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new OneTimeTask() { @Override public void run() { handshake(promise); } }); return promise; } handshake(promise); return promise; }
public static PromiseConverter<Long> toLong(EventExecutor executor) { return new PromiseConverter<Long>(executor) { @Override public FutureListener<Object> newListener(final Promise<Long> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess((Long) resp); } } else { promise.tryFailure(future.cause()); } } }; } }; }
@Override public ChannelHandlerContext fireChannelRegistered() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRegistered(); } }); } return this; }
@Override public ChannelHandlerContext fireChannelUnregistered() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelUnregistered(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelUnregistered(); } }); } return this; }
@Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelActive(); } }); } return this; }
@Override public ChannelHandlerContext fireChannelInactive() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelInactive(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelInactive(); } }); } return this; }
@Override public ChannelHandlerContext fireUserEventTriggered(final Object event) { if (event == null) { throw new NullPointerException("event"); } final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeUserEventTriggered(event); } }); } return this; }
public static PromiseConverter<byte[]> toBytes(EventExecutor executor) { return new PromiseConverter<byte[]>(executor) { @Override public FutureListener<Object> newListener(final Promise<byte[]> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess((byte[]) resp); } } else { promise.tryFailure(future.cause()); } } }; } }; }
@Override public ChannelHandlerContext fireChannelWritabilityChanged() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelWritabilityChanged(); } else { Runnable task = next.invokeChannelWritableStateChangedTask; if (task == null) { next.invokeChannelWritableStateChangedTask = task = new Runnable() { @Override public void run() { next.invokeChannelWritabilityChanged(); } }; } executor.execute(task); } return this; }
@Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
@Override public ChannelFuture close(final ChannelPromise promise) { if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeClose(promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeClose(promise); } }, promise, null); } return promise; }
@Override public ChannelFuture deregister(final ChannelPromise promise) { if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeDeregister(promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeDeregister(promise); } }, promise, null); } return promise; }
@Override public ChannelHandlerContext read() { invokedPrevRead = true; final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); } return this; }
@Override public ChannelHandlerContext flush() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); } else { Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } safeExecute(executor, task, channel.voidPromise(), null); } return this; }
private void destroyUp(AbstractChannelHandlerContext ctx) { final Thread currentThread = Thread.currentThread(); final AbstractChannelHandlerContext tail = this.tail; for (;;) { if (ctx == tail) { destroyDown(currentThread, tail.prev); break; } final EventExecutor executor = ctx.executor(); if (!executor.inEventLoop(currentThread)) { final AbstractChannelHandlerContext finalCtx = ctx; executor.execute(new OneTimeTask() { @Override public void run() { destroyUp(finalCtx); } }); break; } ctx = ctx.next; } }
@Override public ChannelFuture close(final ChannelPromise promise) { ChannelHandlerContext ctx = ctx(); EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { return finishEncode(ctx, promise); } else { final ChannelPromise p = ctx.newPromise(); executor.execute(new Runnable() { @Override public void run() { ChannelFuture f = finishEncode(ctx(), p); f.addListener(new ChannelPromiseNotifier(promise)); } }); return p; } }
public static PromiseConverter<Double> toDouble(EventExecutor executor) { return new PromiseConverter<Double>(executor) { @Override public FutureListener<Object> newListener(final Promise<Double> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess(bytesToDouble((byte[]) resp)); } } else { promise.tryFailure(future.cause()); } } }; } }; }
public static PromiseConverter<Object> toObject(EventExecutor executor) { return new PromiseConverter<Object>(executor) { @Override public FutureListener<Object> newListener(final Promise<Object> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess(resp); } } else { promise.tryFailure(future.cause()); } } }; } }; }
/** * Creates a new instance wrapping a {@code publisher} and publishing to multiple subscribers. * @param publisher the publisher who will publish data to subscribers * @param signalLengthGetter the signal length getter that produces the length of signals * @param executor the executor to use for upstream signals * @param maxSignalLength the maximum length of signals. {@code 0} disables the length limit */ protected AbstractStreamMessageDuplicator( U publisher, SignalLengthGetter<? super T> signalLengthGetter, @Nullable EventExecutor executor, long maxSignalLength) { requireNonNull(publisher, "publisher"); requireNonNull(signalLengthGetter, "signalLengthGetter"); checkArgument(maxSignalLength >= 0, "maxSignalLength: %s (expected: >= 0)", maxSignalLength); if (executor != null) { duplicatorExecutor = executor; } else { duplicatorExecutor = RequestContext.mapCurrent( RequestContext::eventLoop, () -> CommonPools.workerGroup().next()); } processor = new StreamMessageProcessor<>(publisher, signalLengthGetter, duplicatorExecutor, maxSignalLength); }
public static PromiseConverter<String> toString(EventExecutor executor) { return new PromiseConverter<String>(executor) { @Override public FutureListener<Object> newListener(final Promise<String> promise) { return new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (future.isSuccess()) { Object resp = future.getNow(); if (resp instanceof RedisResponseException) { promise.tryFailure((RedisResponseException) resp); } else if (resp == RedisResponseDecoder.NULL_REPLY) { promise.trySuccess(null); } else { promise.trySuccess(resp.toString()); } } else { promise.tryFailure(future.cause()); } } }; } }; }
void registerRequestTimeout(EventExecutor executor) { try { timeout.set(executor.schedule( () -> onChannelError(new TTransportException("Timed out waiting " + requestTimeout + " to receive response")), requestTimeout.toMillis(), MILLISECONDS)); } catch (Throwable throwable) { onChannelError(new TTransportException("Unable to schedule request timeout", throwable)); throw throwable; } }
public static Future<?> setTimeout(EventExecutor executor, Runnable task, long delay) { if (delay <= 0) { return executor.submit(task); } return executor.schedule(task, delay, TimeUnit.MILLISECONDS); }
public BenchmarkClient(int threadCount, int requestCount){ this.threadCount = threadCount; this.requestCount = requestCount; this.stop = new CountDownLatch(threadCount); //创建客户端连接池 ClientConnectionPool clientConnectionPool = RpcClient.createConnectionPool(); //创建异步调用回调执行器 EventExecutor eventExecutor = RpcClient.createEventExecutor(); //创建同步客户端 robotProtocol = RpcClient.createService("RobotService", IRobotProtocol.class, clientConnectionPool, eventExecutor, true); }
public static AutoCloseableEventExecutor forwardingEventExecutor(final EventExecutor eventExecutor, final AutoCloseable closeable) { return createCloseableProxy(new CloseableEventExecutorMixin(eventExecutor) { @Override public void close() throws Exception { // Intentional no-op. closeable.close(); } }); }
@Override public AutoCloseable createInstance() { // The service is provided via blueprint so wait for and return it here for backwards compatibility. final WaitingServiceTracker<EventExecutor> tracker = WaitingServiceTracker.create( EventExecutor.class, bundleContext, "(type=global-event-executor)"); EventExecutor eventExecutor = tracker.waitForService(WaitingServiceTracker.FIVE_MINUTES); return CloseableEventExecutorMixin.forwardingEventExecutor(eventExecutor, tracker); }
@SuppressWarnings({ "rawtypes", "unchecked" }) @Before public void setUp() throws Exception { factory = new GlobalEventExecutorModuleFactory(); super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext,factory)); Filter mockFilter = mock(Filter.class); doReturn("mock").when(mockFilter).toString(); doReturn(mockFilter).when(mockedContext).createFilter(anyString()); doNothing().when(mockedContext).addServiceListener(any(ServiceListener.class), anyString()); ServiceReference mockServiceRef = mock(ServiceReference.class); doReturn(new ServiceReference[]{mockServiceRef}).when(mockedContext). getServiceReferences(anyString(), anyString()); doReturn(mock(EventExecutor.class)).when(mockedContext).getService(mockServiceRef); }
protected void asyncCallback(ChannelHandlerContext ctx, ResponseInfo<?> responseInfo) { HttpProcessingState state = ChannelAttributes.getHttpProcessingStateForChannel(ctx).get(); if (responseInfo.isChunkedResponse()) { // Whoops, chunked responses are not allowed for this endpoint type. asyncErrorCallback( ctx, new Exception("NonblockingEndpoint execution resulted in a chunked ResponseInfo, when only full " + "ResponseInfos are allowed. offending_endpoint_class=" + state.getEndpointForExecution().getClass().getName()) ); } else { executeOnlyIfChannelIsActive( ctx, "NonblockingEndpointExecutionHandler-asyncCallback", () -> { // We have to set the ResponseInfo on the state and fire the event while in the // channel's EventLoop. Otherwise there could be a race condition with an error // that was fired down the pipe that sets the ResponseInfo on the state first, then // this comes along and replaces the ResponseInfo (or vice versa). EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { setResponseInfoAndActivatePipelineForResponse(state, responseInfo, ctx); } else { executor.execute(() -> setResponseInfoAndActivatePipelineForResponse(state, responseInfo, ctx)); } } ); } }
@Before public void beforeMethod() { stateMock = mock(HttpProcessingState.class); proxyRouterStateMock = mock(ProxyRouterProcessingState.class); ctxMock = mock(ChannelHandlerContext.class); channelMock = mock(Channel.class); stateAttrMock = mock(Attribute.class); proxyRouterStateAttrMock = mock(Attribute.class); requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests(); endpointMock = mock(StandardEndpoint.class); longRunningTaskExecutorMock = mock(Executor.class); responseFuture = new CompletableFuture<>(); stateWorkChainFutureSpy = spy(CompletableFuture.completedFuture(null)); eventLoopMock = mock(EventLoop.class); eventExecutorMock = mock(EventExecutor.class); doReturn(channelMock).when(ctxMock).channel(); doReturn(stateAttrMock).when(channelMock).attr(ChannelAttributes.HTTP_PROCESSING_STATE_ATTRIBUTE_KEY); doReturn(stateMock).when(stateAttrMock).get(); doReturn(false).when(stateMock).isRequestHandled(); doReturn(proxyRouterStateAttrMock).when(channelMock).attr(ChannelAttributes.PROXY_ROUTER_PROCESSING_STATE_ATTRIBUTE_KEY); doReturn(proxyRouterStateMock).when(proxyRouterStateAttrMock).get(); doReturn(endpointMock).when(stateMock).getEndpointForExecution(); doReturn(requestInfo).when(stateMock).getRequestInfo(); doReturn(responseFuture).when(endpointMock).execute(any(RequestInfo.class), any(Executor.class), any(ChannelHandlerContext.class)); doReturn(eventLoopMock).when(channelMock).eventLoop(); doReturn(eventExecutorMock).when(ctxMock).executor(); doReturn(true).when(eventExecutorMock).inEventLoop(); doReturn(true).when(channelMock).isActive(); doAnswer(invocation -> { CompletableFuture actualFutureForAttaching = (CompletableFuture) invocation.callRealMethod(); futureThatWillBeAttachedToSpy = spy(actualFutureForAttaching); return futureThatWillBeAttachedToSpy; }).when(stateWorkChainFutureSpy).thenCompose(any(Function.class)); doReturn(stateWorkChainFutureSpy).when(stateMock).getPreEndpointExecutionWorkChain(); handlerSpy = spy(new NonblockingEndpointExecutionHandler(longRunningTaskExecutorMock, defaultCompletableFutureTimeoutMillis)); }
@Override protected void complete() { EventExecutor executor = channel.eventLoop(); executor.execute(new Runnable() { @Override public void run() { channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { removeFromPipeline(); } }); } }); }
ColocatedEventLoopGroup(EventLoopGroup eventLoopGroup) { this.eventLoopGroup = eventLoopGroup; for (EventExecutor ex : eventLoopGroup) { if (ex instanceof EventLoop) { ex.submit(() -> { if (!localLoop.isSet()) { localLoop.set((EventLoop) ex); } }); } } }
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; EventExecutor loop = ctx.executor(); lastReadTime = lastWriteTime = System.nanoTime(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = loop.schedule( new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = loop.schedule( new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = loop.schedule( new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
@Override protected EventExecutor executor() { if (ctx == null) { throw new IllegalStateException(); } return ctx.executor(); }