@Test public void serverBootStrapWithOptionsTest() throws InstantiationException, IllegalAccessException, ClassNotFoundException { LinkedHashMap<String, Object> channelHandlerOptions = new LinkedHashMap<String, Object>(); channelHandlerOptions.put("lineFrame", new LineBasedFrameDecoder(2000)); channelHandlerOptions.put("decoder", new StringDecoder()); channelHandlerOptions.put("encoder", new StringEncoder()); channelHandlerOptions.put("handler", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("Message Received and forward to ConsumerProcessor. Msg -> {}", msg); } }); Server server = BootStrap.builder() .port(5252) .options(channelHandlerOptions) .messageConsumer(msg -> log.info(msg)) .build(); assertNotNull(server); }
public void connect(int port, String host) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); //发送异步链接操作 ChannelFuture f = b.connect(host, port).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); }finally{ group.shutdownGracefully(); } }
@Test public void addByteDecoderWhenNoLeft() throws Exception { channel.pipeline() .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() { }); ChannelHandler decoder = new LineBasedFrameDecoder(12); testContext.addHandlerLast("decoder", decoder) .addHandlerFirst("decoder$extract", NettyPipeline.inboundHandler(ADD_EXTRACTOR)); assertEquals(channel.pipeline() .names(), Arrays.asList("decoder$extract", "decoder", NettyPipeline.ReactiveBridge, "DefaultChannelPipeline$TailContext#0")); }
@Test public void addByteDecoderWhenNoRight() throws Exception { channel.pipeline() .addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() { }); ChannelHandler decoder = new LineBasedFrameDecoder(12); testContext.addHandlerLast("decoder", decoder) .addHandlerFirst("decoder$extract", NettyPipeline.inboundHandler(ADD_EXTRACTOR)); assertEquals(channel.pipeline() .names(), Arrays.asList(NettyPipeline.HttpCodec, "decoder$extract", "decoder", "DefaultChannelPipeline$TailContext#0")); }
@Test public void addByteDecoderWhenFullReactorPipeline() throws Exception { channel.pipeline() .addLast(NettyPipeline.HttpCodec, new HttpServerCodec()) .addLast(NettyPipeline.HttpServerHandler, new ChannelDuplexHandler()) .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() { }); ChannelHandler decoder = new LineBasedFrameDecoder(12); testContext.addHandlerLast("decoder", decoder) .addHandlerFirst("decoder$extract", NettyPipeline.inboundHandler(ADD_EXTRACTOR)); assertEquals(channel.pipeline() .names(), Arrays.asList(NettyPipeline.HttpCodec, NettyPipeline.HttpServerHandler, "decoder$extract", "decoder", NettyPipeline.ReactiveBridge, "DefaultChannelPipeline$TailContext#0")); }
@Test public void addByteEncoderWhenNoLeft() throws Exception { channel.pipeline() .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() { }); ChannelHandler encoder = new LineBasedFrameDecoder(12); testContext.addHandlerFirst("encoder", encoder); assertEquals(channel.pipeline() .names(), Arrays.asList("encoder", NettyPipeline.ReactiveBridge, "DefaultChannelPipeline$TailContext#0")); }
@Test public void addByteEncoderWhenNoRight() throws Exception { channel.pipeline() .addLast(NettyPipeline.HttpCodec, new ChannelHandlerAdapter() { }); ChannelHandler encoder = new LineBasedFrameDecoder(12); testContext.addHandlerFirst("encoder", encoder); assertEquals(channel.pipeline() .names(), Arrays.asList(NettyPipeline.HttpCodec, "encoder", "DefaultChannelPipeline$TailContext#0")); }
@Test public void addByteEncoderWhenFullReactorPipeline() throws Exception { channel.pipeline() .addLast(NettyPipeline.HttpCodec, new HttpServerCodec()) .addLast(NettyPipeline.HttpServerHandler, new ChannelDuplexHandler()) .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() { }); ChannelHandler encoder = new LineBasedFrameDecoder(12); testContext.addHandlerFirst("encoder", encoder); assertEquals(channel.pipeline() .names(), Arrays.asList(NettyPipeline.HttpCodec, NettyPipeline.HttpServerHandler, "encoder", NettyPipeline.ReactiveBridge, "DefaultChannelPipeline$TailContext#0")); }
@Test public void addSeveralByteEncodersWhenCodec() throws Exception { ChannelHandler encoder1 = new LineBasedFrameDecoder(12); ChannelHandler encoder2 = new LineBasedFrameDecoder(13); channel.pipeline() .addLast(NettyPipeline.HttpCodec, new HttpServerCodec()) .addLast(NettyPipeline.HttpServerHandler, new ChannelDuplexHandler()) .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() { }); testContext.addHandlerFirst("encoder1", encoder1) .addHandlerFirst("encoder2", encoder2); assertEquals(channel.pipeline() .names(), Arrays.asList(NettyPipeline.HttpCodec, NettyPipeline.HttpServerHandler, "encoder2", "encoder1", NettyPipeline.ReactiveBridge, "DefaultChannelPipeline$TailContext#0")); }
@Test public void flushOnComplete() { Flux<String> test = Flux.range(0, 100) .map(n -> String.format("%010d", n)); NettyContext c = HttpServer.create(0) .newHandler((req, resp) -> resp.sendString(test.map(s -> s + "\n"))) .block(Duration.ofSeconds(30)); Flux<String> client = HttpClient.create(c.address() .getPort()) .get("/") .block(Duration.ofSeconds(30)) .addHandler(new LineBasedFrameDecoder(10)) .receive() .asString(); StepVerifier.create(client) .expectNextSequence(test.toIterable()) .expectComplete() .verify(Duration.ofSeconds(30)); c.dispose(); }
public static void main(String[] args) throws Exception { EventLoopGroup group = new OioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(RxtxChannel.class) .handler(new ChannelInitializer<RxtxChannel>() { @Override public void initChannel(RxtxChannel ch) throws Exception { ch.pipeline().addLast( new LineBasedFrameDecoder(32768), new StringEncoder(), new StringDecoder(), new RxtxClientHandler() ); } }); ChannelFuture f = b.connect(new RxtxDeviceAddress(PORT)).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
@Override protected void initChannel(SocketChannel ch) throws Exception { LineBasedFrameDecoder lineDecoder = new LineBasedFrameDecoder(MAX_LINE_LENGTH); StringDecoder stringDecoder = new StringDecoder(CHARSET); //FIXME: Should only split on CRLF, not on LF alone MessageDecoder messageDecoder = new MessageDecoder(); MessageHandler messageHandler = new MessageHandler(handler); StringEncoder stringEncoder = new StringEncoder(CHARSET); MessageEncoder messageEncoder = new MessageEncoder(); IdleStateHandler idleHandler = new IdleStateHandler(IDLE_TIMEOUT, 0, 0); // Inbound goes from first to last, outbound goes from last to first. // i.e. the outside is on the left/top, the inside is on the right/bottom ch.pipeline().addLast(lineDecoder).addLast(stringDecoder).addLast(messageDecoder).addLast(idleHandler).addLast(messageHandler) .addLast(stringEncoder).addLast(messageEncoder); }
public void run(String host, int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(20)) .addLast(new StringDecoder()) .addLast(new DelimitedClientHandler()); } }); ChannelFuture cf = bootstrap.connect().sync(); cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
public static void main(String[] args) throws Exception { EventLoopGroup group = new OioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(RxtxChannel.class) .handler(new ChannelInitializer<RxtxChannel>() { @Override public void initChannel(RxtxChannel ch) throws Exception { ch.pipeline().addLast( new LineBasedFrameDecoder(32768), new StringEncoder(), new StringDecoder(), new RxtxClientHandler() ); } }); ChannelFuture f = b.connect(new RxtxDeviceAddress("/dev/ttyUSB0")).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
public SimpleLineBasedSerialChannel(String port, final SimpleStringChannelHandler stringHandler) { group = new OioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(JsscChannel.class) .handler(new ChannelInitializer<JsscChannel>() { @Override public void initChannel(JsscChannel ch) throws Exception { ch.pipeline().addLast( new LineBasedFrameDecoder(Integer.MAX_VALUE), new StringDecoder(), new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, String msg) throws Exception { stringHandler.channelRead(ctx, msg); } } ); } }); f = b.connect(new JsscDeviceAddress(port)).syncUninterruptibly(); }
@Test public void testDecode() throws IOException { EmbeddedChannel decoderEmbedder = new EmbeddedChannel(new LineBasedFrameDecoder(4096), new YencDecoder()); ByteBuf encoded = Unpooled.buffer(); IOUtils.copy(Resources.getResource("lorem-ipsum.ync").openStream(), new ByteBufOutputStream(encoded)); ByteBuf original = Unpooled.buffer(); IOUtils.copy(Resources.getResource("lorem-ipsum").openStream(), new ByteBufOutputStream(original)); decoderEmbedder.writeInbound(encoded); decoderEmbedder.finish(); Object[] result = decoderEmbedder.inboundMessages().toArray(); ByteBuf[] buffers = Arrays.copyOf(result, result.length, ByteBuf[].class); assertThat(Unpooled.copiedBuffer(buffers), exactChannelBuffer(original)); }
@Test(expected = YencChecksumFailureException.class) public void testChecksumFailure() throws Throwable { EmbeddedChannel decoderEmbedder = new EmbeddedChannel(new LineBasedFrameDecoder(4096), new YencDecoder()); ByteBuf encoded = Unpooled.buffer(); IOUtils.copy(Resources.getResource("lorem-ipsum-invalid-checksum.ync").openStream(), new ByteBufOutputStream(encoded)); ByteBuf original = Unpooled.buffer(); IOUtils.copy(Resources.getResource("lorem-ipsum").openStream(), new ByteBufOutputStream(original)); try { decoderEmbedder.writeInbound(encoded); decoderEmbedder.finish(); } catch(DecoderException cee) { throw cee.getCause(); } }
@Override protected void initChannel(SocketChannel ch) throws Exception { //增加以\n 和 \r\n为数据换行符的Handler ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); //增加字符串解析器 ch.pipeline().addLast(new StringDecoder()); //对输入数据进行业务逻辑处理 ch.pipeline().addLast(new RightTimeServerHandler()); }
/** *@description 连接服务器 *@time 创建时间:2017年7月21日下午4:15:50 *@param host *@param port *@throws InterruptedException *@author dzn */ public void connect(String host, int port) throws InterruptedException{ EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap boot = new Bootstrap(); boot.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //增加以\n 和 \r\n为数据换行符的Handler ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); //增加字符串解析器 ch.pipeline().addLast(new StringDecoder()); //对输入数据进行业务逻辑处理 ch.pipeline().addLast(new RightTimeClientHandler()); } }); //连接服务器 ChannelFuture future = boot.connect(host, port).sync(); //等待客户端Channel关闭 future.channel().closeFuture().sync(); }finally{ group.shutdownGracefully(); } }
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(5, 5, 10)); ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); }
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new FileServerHandler()); }
public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new LineServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) logger.info("server bind port:{}", port); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new LineClientHandler()); } }); ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
/** * 채널 파이프라인 설정. * Netty.Server.Configuration.NettyServerConfiguration 에서 등록한 Bean 을 이용해 사용자의 통신을 처리할 Handler 도 등록. * Netty.Server.Handler.JsonHandler 에서 실제 사용자 요청 처리. * * @param channel * @throws Exception */ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline channelPipeline = channel.pipeline(); switch (transferType) { case "websocket": channelPipeline .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(new WebSocketServerCompressionHandler()) .addLast(new WebSocketServerProtocolHandler(transferWebsocketPath, transferWebsocketSubProtocol, transferWebsocketAllowExtensions)) .addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline))) .addLast(websocketHandler); case "tcp": default: channelPipeline .addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE)) .addLast(STRING_DECODER) .addLast(STRING_ENCODER) .addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline))) .addLast(jsonHandler); } }
@Override protected void initChannel(SocketChannel ch) throws Exception { // 以下两行代码为了解决半包读问题 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeServerHandler()); }
public void connect(int port, String host) { // 配置客户端的NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 以下两行代码为了解决半包读问题 ch.pipeline().addLast( new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待链路关闭 f.channel().closeFuture().sync(); } catch (Exception e) { } finally { // 退出,释放NIO线程组 group.shutdownGracefully(); } }
/** * {@inheritDoc} * @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel) */ @Override protected void initChannel(final AbstractChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("bytesDecoder", bytesDecoder); p.addLast("framer", new LineBasedFrameDecoder(1024, true, true)); p.addLast("linehandler", new StringMetricHandler()); }
@Test public void addByteDecoderWhenEmptyPipeline() throws Exception { ChannelHandler decoder = new LineBasedFrameDecoder(12); testContext.addHandlerLast("decoder", decoder) .addHandlerFirst("decoder$extract", NettyPipeline.inboundHandler(ADD_EXTRACTOR)); assertEquals(channel.pipeline() .names(), Arrays.asList("decoder$extract", "decoder", "DefaultChannelPipeline$TailContext#0")); }
@Test public void addSeveralByteDecodersWhenCodec() throws Exception { ChannelHandler decoder1 = new LineBasedFrameDecoder(12); ChannelHandler decoder2 = new LineBasedFrameDecoder(13); channel.pipeline() .addLast(NettyPipeline.HttpCodec, new HttpServerCodec()) .addLast(NettyPipeline.HttpServerHandler, new ChannelDuplexHandler()) .addLast(NettyPipeline.ReactiveBridge, new ChannelHandlerAdapter() { }); testContext.addHandlerLast("decoder1$extract", NettyPipeline.inboundHandler(ADD_EXTRACTOR)) .addHandlerLast("decoder1", decoder1) .addHandlerLast("decoder2$extract", NettyPipeline.inboundHandler(ADD_EXTRACTOR)) .addHandlerLast("decoder2", decoder2); assertEquals(channel.pipeline() .names(), Arrays.asList(NettyPipeline.HttpCodec, NettyPipeline.HttpServerHandler, "decoder1$extract", "decoder1", "decoder2$extract", "decoder2", NettyPipeline.ReactiveBridge, "DefaultChannelPipeline$TailContext#0")); }
@Test public void addByteEncoderWhenEmptyPipeline() throws Exception { ChannelHandler encoder = new LineBasedFrameDecoder(12); testContext.addHandlerFirst("encoder", encoder); assertEquals(channel.pipeline() .names(), Arrays.asList("encoder", "DefaultChannelPipeline$TailContext#0")); }
@Test public void addDecoderSkipsIfExist() { channel.pipeline() .addFirst("foo", new Utf8FrameValidator()); testContext.addHandlerFirst("foo", new LineBasedFrameDecoder(10)); assertEquals(channel.pipeline() .names(), Arrays.asList("foo", "DefaultChannelPipeline$TailContext#0")); assertThat(channel.pipeline() .get("foo"), is(instanceOf(Utf8FrameValidator.class))); }
@Test public void addEncoderSkipsIfExist() { channel.pipeline() .addFirst("foo", new Utf8FrameValidator()); testContext.addHandlerFirst("foo", new LineBasedFrameDecoder(10)); assertEquals(channel.pipeline() .names(), Arrays.asList("foo", "DefaultChannelPipeline$TailContext#0")); assertThat(channel.pipeline() .get("foo"), is(instanceOf(Utf8FrameValidator.class))); }
@Test public void tcpClientHandlesLineFeedData() throws InterruptedException { final int messages = 100; final CountDownLatch latch = new CountDownLatch(messages); final List<String> strings = new ArrayList<String>(); TcpClient.create(opts -> opts.host("localhost") .port(echoServerPort) .afterChannelInit(c -> c.pipeline() .addBefore( NettyPipeline.ReactiveBridge, "codec", new LineBasedFrameDecoder( 8 * 1024)))) .newHandler((in, out) -> out.sendString(Flux.range(1, messages) .map(i -> "Hello World!" + i + "\n") .subscribeOn(Schedulers.parallel())) .then( in.receive() .asString() .take(100) .flatMapIterable(s -> Arrays.asList(s.split("\\n"))) .doOnNext(s -> { strings.add(s); latch.countDown(); }).then()) ) .block(Duration.ofSeconds(15)) .onClose() .block(Duration.ofSeconds(30)); assertTrue("Expected messages not received. Received " + strings.size() + " messages: " + strings, latch.await(15, TimeUnit.SECONDS)); assertEquals(messages, strings.size()); }
@Provides @WhoisProtocol static ImmutableList<Provider<? extends ChannelHandler>> provideHandlerProviders( @WhoisProtocol Provider<ReadTimeoutHandler> readTimeoutHandlerProvider, Provider<LineBasedFrameDecoder> lineBasedFrameDecoderProvider, Provider<WhoisServiceHandler> whoisServiceHandlerProvider, Provider<LoggingHandler> loggingHandlerProvider, Provider<FullHttpRequestRelayHandler> relayHandlerProvider) { return ImmutableList.of( readTimeoutHandlerProvider, lineBasedFrameDecoderProvider, whoisServiceHandlerProvider, loggingHandlerProvider, relayHandlerProvider); }
@Override public ChannelInitializer<Channel> initializer() { return new ChannelInitializer<Channel>() { @Override protected void initChannel(final Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(MAX_LINE)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(decoder, handler); } }; }
@Override public final ChannelInitializer<Channel> initializer() { return new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(MAX_LINE)); ch.pipeline().addLast(decoder, handler); } }; }
public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChannelInitializer<SocketChannel>() { /* * (non-Javadoc) * * @see * io.netty.channel.ChannelInitializer#initChannel(io * .netty.channel.Channel) */ public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new StringEncoder(CharsetUtil.UTF_8), new LineBasedFrameDecoder(1024), new StringDecoder(CharsetUtil.UTF_8), new FileServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); System.out.println("Start file server at port : " + port); f.channel().closeFuture().sync(); } finally { // 优雅停机 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }