/** * 本地爬虫服务,长连接 * * @param action */ public Client(@Nonnull final Action action){ isLongConnection = true; final Client self = this; this.action = action; channelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(ProcessData.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new ReadTimeoutHandler(60)); ch.pipeline().addLast(new LoginAuthReqHandler(channel)); ch.pipeline().addLast(new LocalCrawlerHandler(action)); ch.pipeline().addLast(new HeartBeatReqHandler(self, closeLongConnection)); } }; }
/** * Init Bootstrap */ public static final Bootstrap getBootstrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.addLast("decoder", new ProtobufDecoder(MessageBuf.JMTransfer.getDefaultInstance())); pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("encoder", new ProtobufEncoder()); pipeline.addLast("handler", new TcpClientHandler()); } }); b.option(ChannelOption.SO_KEEPALIVE, true); return b; }
public ExternalJavacProcess() { final JavacRemoteProto.Message msgDefaultInstance = JavacRemoteProto.Message.getDefaultInstance(); myEventLoopGroup = new NioEventLoopGroup(1, SharedThreadPool.getInstance()); myChannelInitializer = new ChannelInitializer() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(msgDefaultInstance), new ProtobufVarint32LengthFieldPrepender(), new ProtobufEncoder(), new CompilationRequestsHandler() ); } }; }
private int startListening() throws Exception { final ServerBootstrap bootstrap = NettyUtil.nioServerBootstrap(new NioEventLoopGroup(1, PooledThreadExecutor.INSTANCE)); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(myChannelRegistrar, new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(CmdlineRemoteProto.Message.getDefaultInstance()), new ProtobufVarint32LengthFieldPrepender(), new ProtobufEncoder(), myMessageDispatcher); } }); Channel serverChannel = bootstrap.bind(NetUtils.getLoopbackAddress(), 0).syncUninterruptibly().channel(); myChannelRegistrar.add(serverChannel); return ((InetSocketAddress)serverChannel.localAddress()).getPort(); }
public void connect() throws Exception { workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap() .group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); pipeline.addLast(new ProtobufDecoder(Protocol.BaseMessage.getDefaultInstance())); clientHandler = new ClientHandler(); pipeline.addLast(clientHandler); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new ProtobufEncoder()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channel = channelFuture.channel(); }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), NettyPistachioClient.HOST, NettyPistachioClient.PORT)); } LogLevel level = LogLevel.DEBUG; p.addLast(new LoggingHandler(level)); p.addLast(new ReadTimeoutHandler(ConfigurationManager.getConfiguration().getInt("Network.Netty.ClientReadTimeoutMillis",10000), TimeUnit.MILLISECONDS)); p.addLast(new ProtobufVarint32FrameDecoder()); p.addLast(new ProtobufDecoder(NettyPistachioProtocol.Response.getDefaultInstance())); p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufEncoder()); p.addLast(new NettyPistachioClientHandler()); }
public void start() throws Exception { logger.info("start start()"); bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup). channel(NioServerSocketChannel.class). handler(new LoggingHandler(LogLevel.INFO)). childHandler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ProtobufVarint32FrameDecoder()); p.addLast(new ProtobufDecoder(WirePayload.getDefaultInstance())); p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufEncoder()); p.addLast(new RPCServerHandler(serviceRegistry)); } }); logger.info("finish start()"); logger.info("bind and waiting for request"); b.bind(port).sync().channel().closeFuture().sync(); }
/** * Returns a new channel initializer suited to encode and decode a protocol * buffer message. * <p/> * <p>Message sizes over 10 MB are not supported.</p> * <p/> * <p>The handler will be executed on the I/O thread. Blocking operations * should be executed in their own thread.</p> * * @param defaultInstance an instance of the message to handle * @param handler the handler implementing the application logic * @param <M> the type of the support protocol buffer message */ public static final <M extends Message> ChannelInitializer<Channel> protoBuf( final M defaultInstance, final SimpleChannelInboundHandler<M> handler) { return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4)); channel.pipeline().addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); channel.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4)); channel.pipeline().addLast("protobufEncoder", new ProtobufEncoder()); channel.pipeline().addLast("applicationHandler", handler); } }; }
@Override protected void initChannel(T ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); p.addLast("protobufDecoder", new ProtobufDecoder(NettyRpcProto.RpcContainer.getDefaultInstance())); p.addLast("frameEncoder", new LengthFieldPrepender(4)); p.addLast("protobufEncoder", new ProtobufEncoder()); ConcurrentHashMap<Integer, RpcCall> callMap = new ConcurrentHashMap<Integer, RpcCall>(); p.addLast(eventExecutor, "inboundHandler", new InboundHandler(callMap)); p.addLast("outboundHandler", new OutboundHandler(callMap)); }
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); p.addLast("protobufDecoder", new ProtobufDecoder(NettyRpcProto.RpcContainer.getDefaultInstance())); p.addLast("frameEncoder", new LengthFieldPrepender(4)); p.addLast("protobufEncoder", new ProtobufEncoder()); p.addLast(eventExecutor, "serverHandler", handler); }
public static void start(MemberEventLoop loop) throws InterruptedException { String host = "127.0.0.1"; int port = 9005; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(SocketMessage.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new IdleStateHandler(0, 5, 10, TimeUnit.SECONDS)); ch.pipeline().addLast(new BusinessRouterHandler(loop)); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } }
public void bind() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); try { b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(ProcessData.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new ReadTimeoutHandler(60)); ch.pipeline().addLast(new LoginAuthRespHandler(channels)); ch.pipeline().addLast(new ShellRespHandler(client, dfsManager)); ch.pipeline().addLast(new WorkerProxyHandler(worker)); ch.pipeline().addLast(new HeartBeatRespHandler()); } }); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error(e.getMessage()); } }
/** * Shell查询服务,短连接 * * @param command */ public Client(@Nonnull final ProcessData command){ action = null; channelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(ProcessData.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new LoginAuthReqHandler(command)); ch.pipeline().addLast(new ShellReqHandler()); } }; }
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ProtobufVarint32FrameDecoder()); p.addLast(new ProtobufDecoder(prototype,extensionRegistry)); p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufEncoder()); p.addLast(new DefaultIdleListenerHandler<T>(listener)); }
public void connect(int port, String host) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); /** * ProtobufDecoder仅仅负责解码, 它不支持读半包, 因此在ProtobufDecoder之前使用ProtobufVarint32FrameDecoder * 来处理半包 */ ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); //发送异步链接操作 ChannelFuture f = b.connect(host, port).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); }finally{ group.shutdownGracefully(); } }
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); /** * ProtobufDecoder仅仅负责解码, 它不支持读半包, 因此在ProtobufDecoder之前使用ProtobufVarint32FrameDecoder * 来处理半包 */ ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); }
@Override protected void initChannel(NioSocketChannel channel) throws Exception { channel.pipeline().addLast("lengthDecoder", new ProtobufVarint32FrameDecoder()); channel.pipeline().addLast("protobufDecoder", new ProtobufDecoder(Protocol.AuthenticatedMessage.getDefaultInstance())); channel.pipeline().addLast("lengthPrepender", new ProtobufVarint32LengthFieldPrepender()); channel.pipeline().addLast("protobufEncoder", new ProtobufEncoder()); channel.pipeline().addLast(new AuthenticatedMessageHandler()); }
private void initialize() { _logger.info("Start, port=" + servicePort); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // inbound .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(LSMessage.Message.getDefaultInstance())) .addLast("business", new MyHandler()) // out bound .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()); } }); _clieChannelFuture = bootstrap.bind(servicePort).sync(); } catch (InterruptedException e) { _logger.error("Encounter exception when initializing netty, quit", e); System.exit(1); } _logger.info("Done"); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new ProtobufVarint32FrameDecoder()); p.addLast(new ProtobufDecoder(WorldClockProtocol.Locations.getDefaultInstance())); p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufEncoder()); p.addLast(new WorldClockServerHandler()); }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), WorldClockClient.HOST, WorldClockClient.PORT)); } p.addLast(new ProtobufVarint32FrameDecoder()); p.addLast(new ProtobufDecoder(WorldClockProtocol.LocalTimes.getDefaultInstance())); p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufEncoder()); p.addLast(new WorldClockClientHandler()); }
public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup();// (1) EventLoopGroup workerGroup = new NioEventLoopGroup();// (2) try { ServerBootstrap b = new ServerBootstrap();// (3) b.group(bossGroup, workerGroup); // (4) b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //decoded // ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(Auth.AuthRequest.getDefaultInstance())); // ch.pipeline().addLast(new ProtobufDecoder(BaseProtocol.RHBaseMessage.getDefaultInstance())); //encoded // ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); // 注册handler ch.pipeline().addLast(new AuthServerInitHandler()); } }); b.option(ChannelOption.SO_BACKLOG, 128); b.childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口 同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
public void connect(String host,int port) throws Exception{ EventLoopGroup workerGroup=new NioEventLoopGroup(); try{ Bootstrap b=new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //decoded // ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(Auth.AuthResponse.getDefaultInstance())); //encoded // ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); // 注册handler ch.pipeline().addLast(new AuthClientInitHandler()); } }); ChannelFuture f=b.connect(host, port).sync(); f.channel().closeFuture().sync(); }finally{ workerGroup.shutdownGracefully(); } }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Enable stream compression (you can remove these two if unnecessary) if (compress) { pipeline.addLast("deflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); pipeline.addLast("inflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); } /** * length (4 bytes). * * Note: max message size is 64 Mb = 67108864 bytes this defines a * framer with a max of 64 Mb message, 4 bytes are the length, and strip * 4 bytes */ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(67108864, 0, 4, 0, 4)); // decoder must be first pipeline.addLast("protobufDecoder", new ProtobufDecoder(CommandMessage.getDefaultInstance())); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); // our server processor (new instance for each connection) pipeline.addLast("handler", new CommHandler()); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Enable stream compression (you can remove these two if unnecessary) if (compress) { pipeline.addLast("deflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); pipeline.addLast("inflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); } /** * length (4 bytes). * * Note: max message size is 64 Mb = 67108864 bytes this defines a * framer with a max of 64 Mb message, 4 bytes are the length, and strip * 4 bytes */ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(67108864, 0, 4, 0, 4)); // decoder must be first pipeline.addLast("protobufDecoder", new ProtobufDecoder(WorkMessage.getDefaultInstance())); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); // our server processor (new instance for each connection) pipeline.addLast("handler", new WorkChannelHandler (state)); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Enable stream compression (you can remove these two if unnecessary) if (compress) { pipeline.addLast("deflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); pipeline.addLast("inflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); } /** * length (4 bytes). * * Note: max message size is 64 Mb = 67108864 bytes this defines a * framer with a max of 64 Mb message, 4 bytes are the length, and strip * 4 bytes */ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(67108864, 0, 4, 0, 4)); // decoder must be first pipeline.addLast("protobufDecoder", new ProtobufDecoder(CommandMessage.getDefaultInstance())); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); // our server processor (new instance for each connection) pipeline.addLast("handler", new CommandChannelHandler (conf, cmdMessageHandler)); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Enable stream compression (you can remove these two if unnecessary) if (compress) { pipeline.addLast("deflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); pipeline.addLast("inflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); } /** * length (4 bytes). * * Note: max message size is 64 Mb = 67108864 bytes this defines a * framer with a max of 64 Mb message, 4 bytes are the length, and strip * 4 bytes */ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(67108864, 0, 4, 0, 4)); // decoder must be first pipeline.addLast("protobufDecoder", new ProtobufDecoder(ClusterMonitor.getDefaultInstance())); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); // our server processor (new instance for each connection) pipeline.addLast("handler", new MonitorHandler()); }
public SimpleProtobufClient(final MessageLite msgDefaultInstance, final Executor asyncExec, final UUIDGetter uuidGetter) { myMessageHandler = new ProtobufClientMessageHandler<T>(uuidGetter, this, asyncExec); myEventLoopGroup = new NioEventLoopGroup(1, asyncExec); myChannelInitializer = new ChannelInitializer() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(msgDefaultInstance), new ProtobufVarint32LengthFieldPrepender(), new ProtobufEncoder(), myMessageHandler); } }; }
public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder( SubscribeRespProto.SubscribeResp .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } }
public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { // ch.pipeline().addLast( // new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder( SubscribeReqProto.SubscribeReq .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap boot = new Bootstrap(); boot.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true); boot.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(OrderResponseProto.OrderResponse.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new OrderClientHandler()); } }); ChannelFuture cf = boot.connect(host, port).sync(); cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100); boot.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(OrderRequestProto.OrderRequest.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new OrderServerHandler()); } }); ChannelFuture cf =boot.bind(port).sync(); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
private void _initNetty(final int port) throws Exception { _bossGroup = new NioEventLoopGroup(1); _workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(_bossGroup, _workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() // inbound .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(MW.DcftMessage.getDefaultInstance())) .addLast("business", new MyHandler()) // outbound .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()); } }); _listeningFuture = b.bind(port).sync(); // add a scheduled task to clean stalled outstanding RPCs _bossGroup.scheduleWithFixedDelay(new OutstandingRpcCleaner(_outstandingRpcs), 5, 1, TimeUnit.SECONDS); }
@Override public void run() { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // outbound (write) ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()); // inbound (read) ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(MW.DcftMembershipList.getDefaultInstance())) .addLast(new DcftClientHandler(_cliId, _rpcDelayInSec)); } }); while (true) { _oneRun(b); Thread.sleep(5000); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }
@Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); pipeline.addLast(new ProtobufDecoder(Protocol.BaseMessage.getDefaultInstance())); pipeline.addLast(new ServerHandler(commandRegistry, executorService)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new ProtobufEncoder()); }
@Override protected void initChannel(SocketChannel ch) throws Exception { LOG.debug("initChannel called ch=" + ch); ChannelPipeline pipeline = ch.pipeline(); //pipeline.addLast("logger", new LoggingHandler(LogLevel.DEBUG)); pipeline.addLast(new BackendFrameDecoder()); pipeline.addLast(new ProtobufDecoder(SimbaMessage.getDefaultInstance())); pipeline.addLast(new BackendHandler(stats, subscriptionManager, cam, BackendConnector.this)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new ProtobufEncoder()); }