public ImapClient(ImapClientConfiguration configuration, Channel channel, SslContext sslContext, EventExecutorGroup promiseExecutor, String clientName) { this.logger = LogUtils.loggerWithName(ImapClient.class, clientName); this.configuration = configuration; this.channel = channel; this.sslContext = sslContext; this.promiseExecutor = promiseExecutor; this.clientState = new ImapClientState(clientName, promiseExecutor); this.codec = new ImapCodec(clientState); this.pendingWriteQueue = new ConcurrentLinkedQueue<>(); this.connectionShutdown = new AtomicBoolean(false); this.connectionClosed = new AtomicBoolean(false); this.capabilities = new AtomicReference<>(null); configureChannel(); }
public ReadWriteExchangeChannelGroup(MsgHandler<Protocol> msgHandler, Address address, int connectTimeout, int reconnectInterval, byte idleTimeout, byte maxIdleTimeOut, boolean lazy, short connections, short writeConnections, boolean reverseIndex, EventLoopGroup loopGroup, EventExecutorGroup executorGroup) throws SailfishException { super(UUID.randomUUID()); this.msgHandler = msgHandler; this.tracer = new Tracer(); NegotiateConfig readConfig = new NegotiateConfig(idleTimeout, maxIdleTimeOut, id(), ChannelType.read.code(), connections, writeConnections, (short) 0, reverseIndex); this.readGroup = new DefaultExchangeChannelGroup(tracer, msgHandler, address, (short) (connections - writeConnections), connectTimeout, reconnectInterval, idleTimeout, maxIdleTimeOut, lazy, reverseIndex, readConfig, this, loopGroup, executorGroup); NegotiateConfig writeConfig = new NegotiateConfig(idleTimeout, maxIdleTimeOut, id(), ChannelType.write.code(), connections, writeConnections, (short) 0, reverseIndex); this.writeGroup = new DefaultExchangeChannelGroup(tracer, msgHandler, address, writeConnections, connectTimeout, reconnectInterval, idleTimeout, maxIdleTimeOut, lazy, reverseIndex, writeConfig, this, loopGroup, executorGroup); }
private ChannelInitializer<SocketChannel> newChannelInitializer(final NegotiateConfig config, final ExchangeChannelGroup channelGroup, final EventExecutorGroup executorGroup) { return new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ch.attr(ChannelAttrKeys.maxIdleTimeout).set(config.maxIdleTimeout()); ch.attr(ChannelAttrKeys.channelGroup).set(channelGroup); ch.attr(ChannelAttrKeys.clientSide).set(true); ch.attr(OneTime.awaitNegotiate).set(new CountDownLatch(1)); ch.attr(OneTime.channelConfig).set(config); // TODO should increase ioRatio when every ChannelHandler bind to executorGroup? pipeline.addLast(executorGroup, RemotingEncoder.INSTANCE, new RemotingDecoder(), new IdleStateHandler(config.idleTimeout(), 0, 0), HeartbeatChannelHandler.INSTANCE, NegotiateChannelHandler.INSTANCE, ConcreteRequestHandler.INSTANCE); } }; }
public void shutdown() throws InterruptedException { try { logger.info("Shutting down Riposte..."); List<ChannelFuture> channelCloseFutures = new ArrayList<>(); for (Channel ch : channels) { // execute shutdown hooks if (serverConfig.serverShutdownHooks() != null) { for (ServerShutdownHook hook : serverConfig.serverShutdownHooks()) { hook.executeServerShutdownHook(serverConfig, ch); } } channelCloseFutures.add(ch.close()); } for (ChannelFuture chf : channelCloseFutures) { chf.sync(); } } finally { eventLoopGroups.forEach(EventExecutorGroup::shutdownGracefully); logger.info("...Riposte shutdown complete"); } }
public NomadServer(NomadLobby nLobby, EventLoopGroup bossGroup, EventLoopGroup workerGroup, EventExecutorGroup executorGroup) { sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup); sb.channel(NioServerSocketChannel.class); final int BUF_PER_CLIENT = Packet.MAX_PACKET_LENGTH * 4; final int MAX_CLIENTS = 2000; sb.option(ChannelOption.SO_BACKLOG, MAX_CLIENTS); sb.option(ChannelOption.SO_REUSEADDR, true); sb.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); sb.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(BUF_PER_CLIENT)); sb.childOption(ChannelOption.SO_SNDBUF, 65535); sb.childOption(ChannelOption.SO_RCVBUF, 65535); sb.childHandler(new ServerHandler(nLobby, executorGroup)); String ip = Nomad.BIND_ON_ALL ? "0.0.0.0" : nLobby.getLobby().getIp(); sb.localAddress(ip, nLobby.getLobby().getPort()); }
@Test(expected = PrematureChannelClosureException.class) public void testIdentifyCloseChannelOnFailure() throws Exception { Channel channel = mock(Channel.class, Answers.RETURNS_SMART_NULLS.get()); mockWriteHandler = mock(ChannelHandler.class); DefaultChannelPromise completedFuture = new DefaultChannelPromise(channel); completedFuture.setSuccess(); DefaultChannelPromise failedFuture = new DefaultChannelPromise(channel); failedFuture.setFailure(new PrematureChannelClosureException("test")); ChannelPipeline channelPipeline = mock(ChannelPipeline.class); when(channelPipeline.addLast(anyString(), any(ChannelHandler.class))).thenReturn(channelPipeline); when(channelPipeline.addLast(any(EventExecutorGroup.class), anyString(), any(ChannelHandler.class))).thenReturn(channelPipeline); when(channel.pipeline()).thenReturn(channelPipeline); when(channel.isActive()).thenReturn(true); when(channel.writeAndFlush(any())).thenReturn(failedFuture); when(channel.close()).thenReturn(completedFuture); when(bootstrap.connect(anyString(), anyInt())).thenReturn(completedFuture); ClientSessionConfiguration configuration = new ClientSessionConfiguration(); jannelClient.identify(configuration, null); }
public static void main(String[] args) throws Exception { init(); EventExecutorGroup eventExecutorGroup = new NioEventLoopGroup(16); /** * POST /login * GET /foods * POST /carts * PATCH /carts/cart_id * POST /orders * GET /orders * GET /admin/orders */ HttpServerUrlHandler httpServerUrlHandler = new HttpServerUrlHandler(new DefaultHandler(HttpResponseStatus.BAD_GATEWAY)) .register(HttpMethod.POST, "/login", new LoginHandler()) .register(HttpMethod.GET, "/foods", new GetStockHandler()) .register(HttpMethod.POST, "/carts", new AddCartHandler()) .register(HttpMethod.PATCH, "/carts", new AddFoodHandler()) .register(HttpMethod.POST, "/orders", new MakeOrderHandler()) .register(HttpMethod.GET, "/orders", new GetOrderHandler()) .register(HttpMethod.GET, "/admin/orders", new AdminGetOrderHandler()); new HttpServer(128, ch -> ch.pipeline().addLast(httpServerUrlHandler)).start(Integer.parseInt(Config.APP_PORT)); }
@Override public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } if (handlers.length == 0 || handlers[0] == null) { return this; } int size; for (size = 1; size < handlers.length; size ++) { if (handlers[size] == null) { break; } } for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; addFirst(executor, generateName(h), h); } return this; }
@Override public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } if (handlers.length == 0 || handlers[0] == null) { return this; } int size; for (size = 1; size < handlers.length; size++) { if (handlers[size] == null) { break; } } for (int i = size - 1; i >= 0; i--) { ChannelHandler h = handlers[i]; addFirst(executor, generateName(h), h); } return this; }
@Test public void testBuilder() throws Exception { DefaultClientResources sut = new DefaultClientResources.Builder().ioThreadPoolSize(4).computationThreadPoolSize(4) .commandLatencyCollectorOptions(DefaultCommandLatencyCollectorOptions.disabled()).build(); EventExecutorGroup eventExecutors = sut.eventExecutorGroup(); NioEventLoopGroup eventLoopGroup = sut.eventLoopGroupProvider().allocate(NioEventLoopGroup.class); assertThat(eventExecutors.iterator()).hasSize(4); assertThat(eventLoopGroup.executorCount()).isEqualTo(4); assertThat(sut.ioThreadPoolSize()).isEqualTo(4); assertThat(sut.commandLatencyCollector()).isNotNull(); assertThat(sut.commandLatencyCollector().isEnabled()).isFalse(); assertThat(sut.shutdown(0, 0, TimeUnit.MILLISECONDS).get()).isTrue(); }
@Test public void reuseClientConnections() throws Exception { // given DefaultClientResources clientResources = DefaultClientResources.create(); Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); RedisClient redisClient1 = newClient(clientResources); RedisClient redisClient2 = newClient(clientResources); connectAndClose(redisClient1); connectAndClose(redisClient2); // when EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); // then connectAndClose(redisClient2); clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS).get(); assertThat(eventLoopGroups).isEmpty(); assertThat(executor.isShuttingDown()).isTrue(); assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); }
@Test public void managedClientResources() throws Exception { // given RedisClient redisClient1 = RedisClient.create(RedisURI.create(TestSettings.host(), TestSettings.port())); ClientResources clientResources = redisClient1.getResources(); Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); connectAndClose(redisClient1); // when EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); // then assertThat(eventLoopGroups).isEmpty(); assertThat(executor.isShuttingDown()).isTrue(); assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); }
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) { if (group == null) { return null; } // Pin one of the child executors once and remember it so that the same child executor // is used to fire events for the same channel. ChannelHandlerInvoker invoker = childInvokers.get(group); if (invoker == null) { EventExecutor executor = group.next(); if (executor instanceof EventLoop) { invoker = ((EventLoop) executor).asInvoker(); } else { invoker = new DefaultChannelHandlerInvoker(executor); } childInvokers.put(group, invoker); } return invoker; }
public NettyPipeline(final EventExecutorGroup executor, final HttpHandler handler, final Config conf, final SslContext sslCtx) { this.executor = executor; this.handler = handler; this.config = conf; maxInitialLineLength = conf.getBytes("netty.http.MaxInitialLineLength").intValue(); maxHeaderSize = conf.getBytes("netty.http.MaxHeaderSize").intValue(); maxChunkSize = conf.getBytes("netty.http.MaxChunkSize").intValue(); maxContentLength = conf.getBytes("netty.http.MaxContentLength").intValue(); idleTimeOut = conf.getDuration("netty.http.IdleTimeout", TimeUnit.MILLISECONDS); supportH2 = conf.getBoolean("server.http2.enabled"); this.tmpdir = config.getString("application.tmpdir"); this.bufferSize = config.getBytes("server.http.ResponseBufferSize").intValue(); this.wsMaxMessageSize = Math .max( config.getBytes("server.ws.MaxTextMessageSize").intValue(), config.getBytes("server.ws.MaxBinaryMessageSize").intValue()); this.sslCtx = sslCtx; }
@Test public void https1_1() throws Exception { Config conf = conf(false, 123, 234, 345, 456, 567L); new MockUnit(EventExecutorGroup.class, HttpHandler.class, SocketChannel.class, ChannelPipeline.class, ChannelHandlerContext.class) .expect(sslContext) .expect(pipeline) .expect(ssl) .expect(http2OrHttp) .expect(ctxpipeline) .expect(http1Codec()) .expect(idle(567)) .expect(aggregator(456)) .expect(jooby(conf)) .run(unit -> { new NettyPipeline(unit.get(EventExecutorGroup.class), unit.get(HttpHandler.class), conf, unit.get(SslContext.class)) .initChannel(unit.get(SocketChannel.class)); }, unit -> { Http2OrHttpHandler handler = unit.captured(Http2OrHttpHandler.class).iterator() .next(); handler.configurePipeline(unit.get(ChannelHandlerContext.class), "http/1.1"); }); }
@Test public void http1_1() throws Exception { Config conf = conf(false, 123, 234, 345, 456, 567L); new MockUnit(EventExecutorGroup.class, HttpHandler.class, SocketChannel.class, ChannelPipeline.class, ChannelHandlerContext.class) .expect(pipeline) .expect(http1Codec()) .expect(idle(567)) .expect(aggregator(456)) .expect(jooby(conf)) .run(unit -> { new NettyPipeline(unit.get(EventExecutorGroup.class), unit.get(HttpHandler.class), conf, null) .initChannel(unit.get(SocketChannel.class)); }); }
@Test public void h2() throws Exception { Config conf = conf(true, 123, 234, 345, 456, 567L); new MockUnit(EventExecutorGroup.class, HttpHandler.class, SocketChannel.class, ChannelPipeline.class, ChannelHandlerContext.class) .expect(sslContext) .expect(pipeline) .expect(ssl) .expect(http2OrHttp) .expect(ctxpipeline) .expect(h2(456)) .expect(idle(567)) .expect(jooby(conf)) .run(unit -> { new NettyPipeline(unit.get(EventExecutorGroup.class), unit.get(HttpHandler.class), conf, unit.get(SslContext.class)) .initChannel(unit.get(SocketChannel.class)); }, unit -> { Http2OrHttpHandler handler = unit.captured(Http2OrHttpHandler.class).iterator() .next(); handler.configurePipeline(unit.get(ChannelHandlerContext.class), "h2"); }); }
@Test public void https1_1_noTimeout() throws Exception { Config conf = conf(false, 123, 234, 345, 456, -1); new MockUnit(EventExecutorGroup.class, HttpHandler.class, SocketChannel.class, ChannelPipeline.class, ChannelHandlerContext.class) .expect(sslContext) .expect(pipeline) .expect(ssl) .expect(http2OrHttp) .expect(ctxpipeline) .expect(http1Codec()) .expect(aggregator(456)) .expect(jooby(conf)) .run(unit -> { new NettyPipeline(unit.get(EventExecutorGroup.class), unit.get(HttpHandler.class), conf, unit.get(SslContext.class)) .initChannel(unit.get(SocketChannel.class)); }, unit -> { Http2OrHttpHandler handler = unit.captured(Http2OrHttpHandler.class).iterator() .next(); handler.configurePipeline(unit.get(ChannelHandlerContext.class), "http/1.1"); }); }
@Test public void unknownProtocol() throws Exception { Config conf = conf(false, 123, 234, 345, 456, 567L); new MockUnit(EventExecutorGroup.class, HttpHandler.class, SocketChannel.class, ChannelPipeline.class, ChannelHandlerContext.class) .expect(sslContext) .expect(pipeline) .expect(ssl) .expect(http2OrHttp) .run(unit -> { new NettyPipeline(unit.get(EventExecutorGroup.class), unit.get(HttpHandler.class), conf, unit.get(SslContext.class)) .initChannel(unit.get(SocketChannel.class)); }, unit -> { Http2OrHttpHandler handler = unit.captured(Http2OrHttpHandler.class).iterator() .next(); try { handler.configurePipeline(unit.get(ChannelHandlerContext.class), "h2"); fail(); } catch (IllegalStateException x) { assertEquals("Unknown protocol: h2", x.getMessage()); } }); }
<T extends ServerSocketChannel> RpcServer(EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutor, Class<T> channel, SocketAddress address) { this.address = address; this.allChannels = new DefaultChannelGroup(eventLoopGroup.next()); this.handler = new ServerHandler(allChannels); this.bootstrap = new ServerBootstrap(); bootstrap.channel(channel); bootstrap.childHandler(new ServerInitializer(eventExecutor, handler)); bootstrap.group(eventLoopGroup); bootstrap.option(ChannelOption.TCP_NODELAY, true); }
/** * {@link EventExecutorGroup#shutdownGracefully(long, long, TimeUnit) Shuts down} the specified executor with {@code 0} graceful * shutdown period. * * @param executor Executor to shutdown (can be {@code null}). * * @return Waiting. */ public static Waiting shutdown(EventExecutorGroup executor) { if (executor == null) { return Waiting.NO_WAIT; } else { Future<?> future = executor.shutdownGracefully(GRACEFUL_SHUTDOWN_PERIOD, Long.MAX_VALUE, TimeUnit.MILLISECONDS); return future::await; } }
public ResponseDecoder(ImapClientConfiguration configuration, ImapClientState clientState, EventExecutorGroup executorGroup) { super(State.SKIP_CONTROL_CHARS); this.logger = LogUtils.loggerWithName(ResponseDecoder.class, clientState.getClientName()); this.clientState = clientState; this.executorGroup = executorGroup; this.charSeq = new SoftReferencedAppendableCharSequence(configuration.defaultResponseBufferSize()); this.lineParser = new LineParser(charSeq, configuration.maxLineLength()); this.wordParser = new WordParser(charSeq, configuration.maxLineLength()); this.fetchResponseTypeParser = new FetchResponseTypeParser(charSeq, configuration.maxLineLength()); this.atomOrStringParser = new AtomOrStringParser(charSeq, configuration.maxLineLength()); this.literalStringParser = new LiteralStringParser(charSeq, configuration.maxLineLength()); this.bufferedBodyParser = new BufferedBodyParser(charSeq); this.numberParser = new NumberParser(charSeq, 19); this.envelopeParser = new EnvelopeParser(); this.nestedArrayParserRecycler = new NestedArrayParser.Recycler<>(literalStringParser); this.messageBuilder = ((DefaultMessageBuilder) MESSAGE_SERVICE_FACTORY.newMessageBuilder()); MimeConfig mimeConfig = MimeConfig.custom() .setMaxLineLen(configuration.maxLineLength()) .setMaxHeaderLen(configuration.maxLineLength()) .setMaxHeaderCount(configuration.maxHeaderCount()) .build(); messageBuilder.setMimeEntityConfig(mimeConfig); this.untaggedResponses = new ArrayList<>(); this.responseBuilder = new TaggedResponse.Builder(); this.allBytesParser = configuration.tracingEnabled() ? new AllBytesParser(charSeq) : null; }
@Default default EventExecutorGroup executor() { Logger logger = LoggerFactory.getLogger("imap-executor"); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setUncaughtExceptionHandler((t, e) -> logger.error("Uncaught exception on thread {}", t.getName(), e)) .setNameFormat("imap-executor-%d") .build(); int nThreads = Runtime.getRuntime().availableProcessors() * 2; return new DefaultEventExecutorGroup(nThreads, threadFactory); }
public ImapClientState(String clientName, EventExecutorGroup executorGroup) { this.clientName = clientName; this.executorGroup = executorGroup; this.currentCommand = new AtomicReference<>(); this.commandCount = new AtomicLong(0); this.messageNumber = new AtomicLong(0); this.messageAddListeners = new CopyOnWriteArrayList<>(); this.openEventListeners = new CopyOnWriteArrayList<>(); this.connectionListeners = new CopyOnWriteArrayList<>(); this.handlers = new CopyOnWriteArrayList<>(); }
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { final EventExecutorGroup group = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors() + 1); socketChannel.pipeline().addLast(new CommonEncoder()); socketChannel.pipeline().addLast(new CommonDecoder()); socketChannel.pipeline().addLast(group, new CommonServiceHandler(accessService)); }
public HandlerEntry(String name, ChannelHandler handler, EventExecutorGroup group) { _value = handler; _key = name; _group = group; }
protected MultiConnectionsExchangeChannelGroup(Tracer tracer, MsgHandler<Protocol> msgHandler, Address address, short connections, int connectTimeout, int reconnectInterval, byte idleTimeout, byte maxIdleTimeOut, boolean lazy, boolean reverseIndex, NegotiateConfig config, ExchangeChannelGroup parentGroup, EventLoopGroup loopGroup, EventExecutorGroup executorGroup) throws SailfishException { this.tracer = tracer; this.msgHandler = msgHandler; children = new ExchangeChannel[connections]; deadChildren = new ExchangeChannel[connections]; if (null == config) { config = new NegotiateConfig(idleTimeout, maxIdleTimeOut, id(), ChannelType.readwrite.code(), (short) connections, (short) connections, (short) 0, reverseIndex); } Bootstrap bootstrap = null; for (short i = 0; i < connections; i++) { boolean success = false; final NegotiateConfig deepCopy = config.deepCopy().index(i); parentGroup = (null == parentGroup ? this : parentGroup); bootstrap = configureBoostrap(address, connectTimeout, deepCopy, parentGroup, loopGroup, executorGroup); try { children[i] = newChild(parentGroup, bootstrap, reconnectInterval, lazy, deepCopy.isRead()); success = true; } catch (SailfishException cause) { throw cause; } finally { if (!success) { close(Integer.MAX_VALUE); } } } chooser = DefaultExchangeChannelChooserFactory.INSTANCE.newChooser(children, deadChildren); }
public DefaultExchangeChannelGroup(Tracer tracer, MsgHandler<Protocol> msgHandler, Address address, short connections, int connectTimeout, int reconnectInterval, byte idleTimeout, byte maxIdleTimeOut, boolean lazy, boolean reverseIndex, NegotiateConfig config, ExchangeChannelGroup parentGroup, EventLoopGroup loopGroup, EventExecutorGroup executorGroup) throws SailfishException { super(tracer, msgHandler, address, connections, connectTimeout, reconnectInterval, idleTimeout, maxIdleTimeOut, lazy, reverseIndex, config, parentGroup, loopGroup, executorGroup); }
protected Bootstrap configureBoostrap(Address remoteAddress, int connectTimeout, NegotiateConfig config, ExchangeChannelGroup channelGroup, EventLoopGroup loopGroup, EventExecutorGroup executorGroup) { Bootstrap boot = newBootstrap(); if (null == loopGroup) { loopGroup = ClientEventGroup.INSTANCE.getLoopGroup(); } if (null == executorGroup) { executorGroup = ClientEventGroup.INSTANCE.getExecutorGroup(); } boot.group(loopGroup); boot.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); boot.remoteAddress(remoteAddress.host(), remoteAddress.port()); boot.handler(newChannelInitializer(config, channelGroup, executorGroup)); return boot; }
public void start() throws SailfishException { ServerBootstrap boot = newServerBootstrap(); EventLoopGroup accept = NettyPlatformIndependent.newEventLoopGroup(1, new DefaultThreadFactory(RemotingConstants.SERVER_ACCEPT_THREADNAME)); if (null != config.getEventLoopGroup()) { boot.group(accept, config.getEventLoopGroup()); } else { boot.group(accept, ServerEventGroup.INSTANCE.getLoopGroup()); } final EventExecutorGroup executor = (null != config.getEventExecutorGroup() ? config.getEventExecutorGroup() : ServerEventGroup.INSTANCE.getExecutorGroup()); boot.localAddress(config.address().host(), config.address().port()); boot.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ch.attr(ChannelAttrKeys.OneTime.idleTimeout).set(config.idleTimeout()); ch.attr(ChannelAttrKeys.maxIdleTimeout).set(config.maxIdleTimeout()); ch.attr(ChannelAttrKeys.exchangeServer).set(DefaultServer.this); pipeline.addLast(executor, RemotingEncoder.INSTANCE, new RemotingDecoder(), new IdleStateHandler(config.idleTimeout(), 0, 0), HeartbeatChannelHandler.INSTANCE, NegotiateChannelHandler.INSTANCE, ConcreteRequestHandler.INSTANCE); } }); try { channel = boot.bind().syncUninterruptibly().channel(); } catch (Throwable cause) { throw new SailfishException(cause); } }
@Override public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); final EventExecutorGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( (ChannelHandler) decoderClass.newInstance(), // 解码器 (ChannelHandler) encoderClass.newInstance() // 编码器 ); ch.pipeline().addLast(group,"",(ChannelInboundHandlerAdapter) handlerClass.newInstance()); //处理器 } }) .option(ChannelOption.SO_BACKLOG, 128) // (5)backlog 指定了内核为此套接口排队的最大连接个数 .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port); // (7) f.sync(); channel = f.channel(); latch.countDown(); // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); log.info(entranceName+"netty stop "); }catch (Exception e){ e.printStackTrace(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
public JannelClient(final EventLoopGroup eventLoopGroup, final Class<? extends Channel> channelClass, final EventExecutorGroup eventExecutors) { this(new Bootstrap().group(eventLoopGroup).channel(channelClass), eventExecutors, new ChannelHandlerProvider(), new DefaultTranscoder(new TranscoderHelper()), new HashedWheelTimer()); }
public JannelClient(final Bootstrap clientBootstrap, final EventExecutorGroup eventExecutors, final ChannelHandlerProvider channelHandlerProvider, final Transcoder transcoder, final Timer timer) { this.eventLoopGroup = clientBootstrap.group(); this.sessionExecutor = eventExecutors; this.channelHandlerProvider = channelHandlerProvider; this.transcoder = transcoder; this.clientBootstrap = clientBootstrap.handler(new DummyChannelHandler()); this.timer = timer; }
@Test public void testIdentifyConnectsToCorrectRemoteServerWithConnectionTimeout() throws Exception { Channel channel = mock(Channel.class, Answers.RETURNS_SMART_NULLS.get()); mockWriteHandler = mock(ChannelHandler.class); DefaultChannelPromise completedFuture = new DefaultChannelPromise(channel); completedFuture.setSuccess(); ChannelPipeline channelPipeline = mock(ChannelPipeline.class); when(channelPipeline.addLast(anyString(), any(ChannelHandler.class))).thenReturn(channelPipeline); when(channelPipeline.addLast(any(EventExecutorGroup.class), anyString(), any(ChannelHandler.class))).thenReturn(channelPipeline); when(channel.pipeline()).thenReturn(channelPipeline); when(channel.isActive()).thenReturn(true); when(channel.writeAndFlush(any())).thenReturn(completedFuture); when(bootstrap.connect(anyString(), anyInt())).thenReturn(completedFuture); ClientSessionConfiguration configuration = new ClientSessionConfiguration(); configuration.setHost("testHost"); configuration.setPort(1111); configuration.setConnectTimeout(10000); jannelClient.identify(configuration, null); verify(bootstrap).connect(configuration.getHost(), configuration.getPort()); verify(bootstrap).option(ChannelOption.valueOf("connectTimeoutMillis"), configuration.getConnectTimeout()); }
@Test public void testIdentifySendsCorrectIdentifyCommand() throws Exception { Channel channel = mock(Channel.class, Answers.RETURNS_SMART_NULLS.get()); mockWriteHandler = mock(ChannelHandler.class); DefaultChannelPromise completedFuture = new DefaultChannelPromise(channel); completedFuture.setSuccess(); ChannelPipeline channelPipeline = mock(ChannelPipeline.class); when(channelPipeline.addLast(anyString(), any(ChannelHandler.class))).thenReturn(channelPipeline); when(channelPipeline.addLast(any(EventExecutorGroup.class), anyString(), any(ChannelHandler.class))).thenReturn(channelPipeline); when(channel.pipeline()).thenReturn(channelPipeline); when(channel.isActive()).thenReturn(true); when(channel.writeAndFlush(any())).thenReturn(completedFuture); when(bootstrap.connect(anyString(), anyInt())).thenReturn(completedFuture); ClientSessionConfiguration configuration = new ClientSessionConfiguration(); configuration.setClientId("testId"); jannelClient.identify(configuration, null); ArgumentCaptor<Admin> captor = ArgumentCaptor.forClass(Admin.class); verify(channel).writeAndFlush(captor.capture()); Admin command = captor.getValue(); assertEquals("Wrong command type", AdminCommand.IDENTIFY, command.getAdminCommand()); assertEquals("Wrong client id", configuration.getClientId(), command.getBoxId()); }
protected EventExecutorGroup createExecutorService() { // Provide the executor service for the application // and use a Camel thread factory so we have consistent thread namings // we should use a shared thread pool as recommended by Netty String pattern = getCamelContext().getExecutorServiceManager().getThreadNamePattern(); ThreadFactory factory = new CamelThreadFactory(pattern, "NettyEventExecutorGroup", true); return new DefaultEventExecutorGroup(getMaximumPoolSize(), factory); }
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, boolean inbound, boolean outbound) { if (name == null) { throw new NullPointerException("name"); } channel = pipeline.channel; this.pipeline = pipeline; this.name = name; if (group != null) { // Pin one of the child executors once and remember it so that the same child executor // is used to fire events for the same channel. EventExecutor childExecutor = pipeline.childExecutors.get(group); if (childExecutor == null) { childExecutor = group.next(); pipeline.childExecutors.put(group, childExecutor); } executor = childExecutor; } else { executor = null; } this.inbound = inbound; this.outbound = outbound; }
@Override public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addFirst0(name, newCtx); } return this; }
@Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; }
@Override public ChannelPipeline addBefore( EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { synchronized (this) { AbstractChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addBefore0(name, ctx, newCtx); } return this; }