public void run() { // Configure the client. ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1); ClientBootstrap bootstrap = new ClientBootstrap(factory); // Set up the pipeline factory. bootstrap.setPipelineFactory(setPipelineFactory()); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); // Start the connection attempt. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); if (oneShot) { // Wait until the connection is closed or the connection attempt fails. future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); } }
/** * Starts the BGP peer. * * @param connectToSocket the socket to connect to */ private void connect(InetSocketAddress connectToSocket) throws InterruptedException { ChannelFactory channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ChannelPipelineFactory pipelineFactory = () -> { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("BgpPeerFrameDecoderTest", peerFrameDecoder); pipeline.addLast("BgpPeerChannelHandlerTest", peerChannelHandler); return pipeline; }; peerBootstrap = new ClientBootstrap(channelFactory); peerBootstrap.setOption("child.keepAlive", true); peerBootstrap.setOption("child.tcpNoDelay", true); peerBootstrap.setPipelineFactory(pipelineFactory); peerBootstrap.connect(connectToSocket); }
private Channel connectFrom(InetSocketAddress connectToSocket, SocketAddress localAddress) throws InterruptedException { ChannelFactory channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ChannelPipelineFactory pipelineFactory = () -> { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("BgpPeerFrameDecoderTest", peerFrameDecoder); pipeline.addLast("BgpPeerChannelHandlerTest", peerChannelHandler); return pipeline; }; peerBootstrap = new ClientBootstrap(channelFactory); peerBootstrap.setOption("child.keepAlive", true); peerBootstrap.setOption("child.tcpNoDelay", true); peerBootstrap.setPipelineFactory(pipelineFactory); Channel channel = peerBootstrap.connect(connectToSocket, localAddress).getChannel(); return channel; }
public static void main(String[] args) { String hostname = "127.0.0.1"; int port = 5044; if (args.length >= 2) { hostname = args[0]; port = firstNonNull(Ints.tryParse(args[1]), 5044); } if (args.length >= 1) { port = firstNonNull(Ints.tryParse(args[1]), 5044); } final ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); final ServerBootstrap b = new ServerBootstrap(factory); b.getPipeline().addLast("beats-frame-decoder", new BeatsFrameDecoder()); b.getPipeline().addLast("beats-codec", new BeatsCodecHandler()); b.getPipeline().addLast("logging", new LoggingHandler()); System.out.println("Starting listener on " + hostname + ":" + port); b.bind(new InetSocketAddress(hostname, port)); }
@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); //设置线程池(但是线程池中的线程都是守护线程,为的就是当JVM退出时候不用考虑守护线程是否已经结束) ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); //Netty启动类 //定义NettyHandler(这个应该是通用的Handler,只有在服务启动的时候生效一次) final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); //增加解码处理器 pipeline.addLast("encoder", adapter.getEncoder()); //增加编码处理器 pipeline.addLast("handler", nettyHandler); //增加具体操作的处理器 return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
@Override public void doOpen() throws Throwable { ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", false)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); int ioThread = conf.getInt(Constants.IO_THREADS,Constants.DEFAULT_IO_THREADS); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, ioThread); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getConf(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(conf,getCodec(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
@SuppressWarnings("rawtypes") NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) { super(storm_conf, factory, scheduler, host, port, reconnector); BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf); blockSend = isBlockSend(storm_conf); directlySend = isDirectSend(storm_conf); flush_later = new AtomicBoolean(false); flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); Runnable flusher = new Runnable() { @Override public void run() { flush(); } }; long initialDelay = Math.min(1000, max_sleep_ms * max_retries); scheduler.scheduleAtFixedRate(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS); clientChannelFactory = factory; start(); LOG.info(this.toString()); }
public void init(ChannelPipelineFactory pipeline, int workerNum) { ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); DefaultChannelFuture.setUseDeadLockChecker(false); pipelineFactory = pipeline; bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(pipelineFactory); // TODO - should be configurable bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.setOption("child.connectTimeoutMillis", 10000); bootstrap.setOption("child.connectResponseTimeoutMillis", 10000); bootstrap.setOption("child.receiveBufferSize", 1048576 * 10); }
public void prepare(Map stormConf, TopologyContext context, final OutputCollector collector) { _collector = collector; ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); _bootstrap = new ClientBootstrap(factory); _bootstrap.setPipelineFactory(getPipelineFactory()); _bootstrap.setOptions(_options); ChannelFuture future = _bootstrap.connect(new InetSocketAddress(_host, _port)); int connectTimeout = DEFAULT_CONNECT_TIMEOUT; Object connectTimeoutConfig = stormConf.get(Config.NIMBUS_TASK_LAUNCH_SECS); if (connectTimeoutConfig != null) { connectTimeout = ((Number)connectTimeoutConfig).intValue()*1000/2; } future.awaitUninterruptibly(connectTimeout); if (!future.isSuccess()) { _bootstrap.releaseExternalResources(); throw new RuntimeException("Could not connect to '"+_host+":"+_port, future.getCause()); } _channel = future.getChannel(); }
public static void main(String[] args) throws Exception { Executor executor = Executors.newCachedThreadPool(); ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor); ServerBootstrap sb = new ServerBootstrap(factory); ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor); sb.setPipelineFactory(new ProxyPipelineFactory(cf, RtmpConfig.PROXY_REMOTE_HOST, RtmpConfig.PROXY_REMOTE_PORT)); InetSocketAddress socketAddress = new InetSocketAddress(RtmpConfig.PROXY_PORT); sb.bind(socketAddress); logger.info("proxy server started, listening on {}", socketAddress); Thread monitor = new StopMonitor(RtmpConfig.PROXY_STOP_PORT); monitor.start(); monitor.join(); ChannelGroupFuture future = ALL_CHANNELS.close(); logger.info("closing channels"); future.awaitUninterruptibly(); logger.info("releasing resources"); factory.releaseExternalResources(); logger.info("server stopped"); }
private void addNewSwitch(DummySwitch dummySwitch) { final SwitchChannelHandler switchHandler = new SwitchChannelHandler(coreConnector, aggreedVersion, moduleName); switchHandler.setDummySwitch(dummySwitch); // CONTAINS ALL THE INFO // ABOUT THIS SWITCH ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { return Channels.pipeline(switchHandler); } }); // CONNECT AND ADD TO HASHMAP OF MANAGED SWITCHES ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 7753)); managedSwitchesChannel.put(dummySwitch.getDatapathId(), future); managedBootstraps.put(dummySwitch.getDatapathId(), bootstrap); managedSwitches.put(dummySwitch.getDatapathId(), dummySwitch); switchHandler.registerSwitchConnection(future); switchHandler.setModuleHandler(moduleHandler); }
@Override public void startUp(FloodlightModuleContext context) { //ADD SWITCH LISTENERS floodlightProvider.addOFSwitchListener(this); //REGISTER FOR MESSAGES FROM THE SWITCHES floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this); floodlightProvider.addOFMessageListener(OFType.PACKET_OUT, this); floodlightProvider.addOFMessageListener(OFType.FLOW_MOD, this); floodlightProvider.addOFMessageListener(OFType.ERROR, this); //START UP THE SERVER FOR THE ODL-SHIM ChannelFactory serverFactory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap serverBootstrap = new ServerBootstrap(serverFactory); serverBootstrap.setOption("child.tcpNoDelay", true); serverBootstrap.setOption("child.keepAlive", true); serverBootstrap.setPipelineFactory(new NetIdePipelineFactory()); logger.info("NetIDE Module binding to 41414..." ); serverBootstrap.bind(new InetSocketAddress(41414)); //TODO: REMOVE HARD CODING }
/** * Creates the comms channel to the SDN Controller and then adds a * fake switch for the controller to manage * @param dummySwitch the switch to be managed */ private void addNewSwitch(DummySwitch dummySwitch) { final SwitchChannelHandler switchHandler = new SwitchChannelHandler(); switchHandler.setDummySwitch(dummySwitch); //CONTAINS ALL THE INFO ABOUT THIS SWITCH switchHandler.setShimChannel(this.channel); ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { return Channels.pipeline(switchHandler); } }); //CONNECT AND ADD TO HASHMAP OF MANAGED SWITCHES ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 6634)); managedSwitches.put(dummySwitch.getId(), future); managedBootstraps.put(dummySwitch.getId(), bootstrap); switchHandler.setControllerChannel(future); }
/** * Initialises the server. * * @param releaseClassName The class name of the current active {@link Release}. * @throws ClassNotFoundException If the release class could not be found. * @throws IllegalAccessException If the release class could not be accessed. * @throws InstantiationException If the release class could not be instantiated. */ public void init(String releaseClassName) throws ClassNotFoundException, InstantiationException, IllegalAccessException { Class<?> clazz = Class.forName(releaseClassName); Release release = (Release) clazz.newInstance(); logger.info("Initialized release #" + release.getReleaseNumber() + "."); ChannelFactory factory = new NioServerSocketChannelFactory(networkExecutor, networkExecutor); serviceBootstrap.setFactory(factory); httpBootstrap.setFactory(factory); jagGrabBootstrap.setFactory(factory); context = new ServerContext(release, serviceManager); ApolloHandler handler = new ApolloHandler(context); ChannelPipelineFactory servicePipelineFactory = new ServicePipelineFactory(handler, timer); serviceBootstrap.setPipelineFactory(servicePipelineFactory); ChannelPipelineFactory httpPipelineFactory = new HttpPipelineFactory(handler, timer); httpBootstrap.setPipelineFactory(httpPipelineFactory); ChannelPipelineFactory jagGrabPipelineFactory = new JagGrabPipelineFactory(handler, timer); jagGrabBootstrap.setPipelineFactory(jagGrabPipelineFactory); }
/** * Creates a new channel to given host and port.<br> * * @param host * @param port * @return * @throws Exception */ private Channel createChannel(String host, int port) throws Exception { // Important notice; use NioClientSocketChannelFactory instead // of NioServerSocketChannelFactory ChannelFactory channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); //bootstrap.setPipelineFactory(new SipClientPipelineFactory(false,false)); bootstrap.setPipelineFactory(new SipPipelineFactory(sipServerHandler)); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); // open / connect to channel Channel c = future.await().getChannel(); if (!future.isSuccess()) { log.warn(String.format("createChannel. Establishing connection failed[%s]", future.getCause().getMessage())); bootstrap.releaseExternalResources(); } return c; }
protected HttpTunnelServerChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, ServerSocketChannelFactory inboundFactory, ChannelGroup realConnections) { super(factory, pipeline, sink); tunnelIdPrefix = Long.toHexString(random.nextLong()); tunnels = new ConcurrentHashMap<String, HttpTunnelAcceptedChannel>(); config = new HttpTunnelServerChannelConfig(); realChannel = inboundFactory.newChannel(this.createRealPipeline(realConnections)); config.setRealChannel(realChannel); opened = new AtomicBoolean(true); bindState = new AtomicReference<BindState>(BindState.UNBOUND); realConnections.add(realChannel); Channels.fireChannelOpen(this); }
@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
@Override public void start() { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap serverBootstrap = new ServerBootstrap(factory); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { syslogTcpHandler handler = new syslogTcpHandler(); handler.setEventSize(eventSize); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); return Channels.pipeline(handler); } }); logger.info("Syslog TCP Source starting..."); if (host == null) { nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); } else { nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); } sourceCounter.start(); super.start(); }
private void startServer() { ChannelFactory channelFactory = new NioServerSocketChannelFactory( newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d")), newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d"))); ChannelPipelineFactory pipelineFactory = () -> { // Allocate a new session per connection FpmSessionHandler fpmSessionHandler = new FpmSessionHandler(new InternalFpmListener()); FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder(); // Setup the processing pipeline ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder); pipeline.addLast("FpmSession", fpmSessionHandler); return pipeline; }; InetSocketAddress listenAddress = new InetSocketAddress(FPM_PORT); serverBootstrap = new ServerBootstrap(channelFactory); serverBootstrap.setOption("child.reuseAddr", true); serverBootstrap.setOption("child.keepAlive", true); serverBootstrap.setOption("child.tcpNoDelay", true); serverBootstrap.setPipelineFactory(pipelineFactory); try { serverChannel = serverBootstrap.bind(listenAddress); allChannels.add(serverChannel); } catch (ChannelException e) { log.debug("Exception binding to FPM port {}: ", listenAddress.getPort(), e); stopServer(); } }
public void start() { log.debug("BGP Session Manager start."); isShutdown = false; ChannelFactory channelFactory = new NioServerSocketChannelFactory( newCachedThreadPool(groupedThreads("onos/bgp", "sm-boss-%d")), newCachedThreadPool(groupedThreads("onos/bgp", "sm-worker-%d"))); ChannelPipelineFactory pipelineFactory = () -> { // Allocate a new session per connection BgpSession bgpSessionHandler = new BgpSession(BgpSessionManager.this); BgpFrameDecoder bgpFrameDecoder = new BgpFrameDecoder(bgpSessionHandler); // Setup the processing pipeline ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("BgpFrameDecoder", bgpFrameDecoder); pipeline.addLast("BgpSession", bgpSessionHandler); return pipeline; }; InetSocketAddress listenAddress = new InetSocketAddress(bgpPort); serverBootstrap = new ServerBootstrap(channelFactory); // serverBootstrap.setOptions("reuseAddr", true); serverBootstrap.setOption("child.keepAlive", true); serverBootstrap.setOption("child.tcpNoDelay", true); serverBootstrap.setPipelineFactory(pipelineFactory); try { serverChannel = serverBootstrap.bind(listenAddress); allChannels.add(serverChannel); } catch (ChannelException e) { log.debug("Exception binding to BGP port {}: ", listenAddress.getPort(), e); } }
/** * 打开netty通道 并设置参数 * @see com.alibaba.dubbo.remoting.transport.AbstractServer#doOpen() */ @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); // workerCount默认Cpu核数+1 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); //所有handler的调用链 //NettyHandler->NettyServer->MultiMessageHandler->HeartbeatHandler->(SPI,default is ALL)Dispatcher.dispatch() //->DecodeHandler->HeaderExchangeHandler->DubboProtocol.requestHandler final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException { // Start client with Nb of active threads = 3 as maximum. ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()), Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3); // Create the bootstrap ClientBootstrap bootstrap = new ClientBootstrap(factory); InetSocketAddress addr = new InetSocketAddress(host, port); ChannelPipeline pipeline = bootstrap.getPipeline(); pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4)); pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance())); pipeline.addLast("protobufencoder", new ProtobufEncoder()); Handler handler = new Handler(); pipeline.addLast("handler", handler); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); bootstrap.setOption("reuseAddress", true); bootstrap.setOption("connectTimeoutMillis", 100); ChannelFuture channelFuture = bootstrap.connect(addr).await(); channel = channelFuture.getChannel(); }
@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(() -> { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; }); // bind channel = bootstrap.bind(getBindAddress()); }