@Override protected void channelRead0(ChannelHandlerContext context, Object message) throws Exception { final Channel channel = context.channel(); if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(channel, (FullHttpResponse) message); channel.pipeline().addBefore(HANDLER_NAME, "websocket-frame-aggregator", new WebSocketFrameAggregator(64 * 1024)); subscriber.onStart(); return; } if (message instanceof FullHttpResponse) { final FullHttpResponse response = (FullHttpResponse) message; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } final WebSocketFrame frame = (WebSocketFrame) message; if (frame instanceof PingWebSocketFrame) { context.writeAndFlush(new PongWebSocketFrame(((PingWebSocketFrame)frame).retain().content())); } else if (frame instanceof BinaryWebSocketFrame) { final ByteBufInputStream input = new ByteBufInputStream(((BinaryWebSocketFrame)message).content()); final Envelope envelope = Envelope.ADAPTER.decode(input); subscriber.onNext(envelope); } }
private void handshake(final ChannelHandlerContext ctx, final FullHttpRequest req, final String requestPath) { WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true, maxWebSocketFrameSize); WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); if (handshaker != null) { handshaker.handshake(ctx.channel(), req).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { final String sessionId = PipelineUtils.getSessionId(requestPath); if (future.isSuccess()) { ctx.channel().pipeline().addBefore( SocketIOChannelInitializer.SOCKETIO_WEBSOCKET_HANDLER, SocketIOChannelInitializer.WEBSOCKET_FRAME_AGGREGATOR, new WebSocketFrameAggregator(maxWebSocketFrameSize)); connect(ctx, req, sessionId); } else { log.error("Can't handshake: {}", sessionId, future.cause()); } } }); } else { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } }
private void handleWebSocketRequest(@NotNull final ChannelHandlerContext context, @NotNull FullHttpRequest request, @NotNull final QueryStringDecoder uriDecoder) { WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://" + request.headers().getAsString(HttpHeaderNames.HOST) + uriDecoder.path(), null, false, NettyUtil.MAX_CONTENT_LENGTH); WebSocketServerHandshaker handshaker = factory.newHandshaker(request); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel()); return; } if (!context.channel().isOpen()) { return; } final Client client = new WebSocketClient(context.channel(), handshaker); context.attr(ClientManager.CLIENT).set(client); handshaker.handshake(context.channel(), request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { ClientManager clientManager = WebSocketHandshakeHandler.this.clientManager.getValue(); clientManager.addClient(client); MessageChannelHandler messageChannelHandler = new MessageChannelHandler(clientManager, getMessageServer()); BuiltInServer.replaceDefaultHandler(context, messageChannelHandler); ChannelHandlerContext messageChannelHandlerContext = context.pipeline().context(messageChannelHandler); context.pipeline().addBefore(messageChannelHandlerContext.name(), "webSocketFrameAggregator", new WebSocketFrameAggregator(NettyUtil.MAX_CONTENT_LENGTH)); messageChannelHandlerContext.attr(ClientManager.CLIENT).set(client); connected(client, uriDecoder.parameters()); } } }); }
@Override protected void initChannel(SocketChannel ch) throws Exception { decoder = new WebsocketProtostuffDecoder(handShaker); final ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-client", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(C5Constants.MAX_RESPONSE_SIZE)); pipeline.addLast("websec-codec", new WebsocketProtostuffEncoder(handShaker)); pipeline.addLast("websocket-aggregator", new WebSocketFrameAggregator(C5Constants.MAX_RESPONSE_SIZE)); pipeline.addLast("message-codec", decoder); pipeline.addLast("message-handler", new FutureBasedMessageHandler()); }
public void upgradePipelineForWebSockets(ChannelPipeline pipeline) { pipeline.addAfter(HTTP_CLIENT_CODEC, WS_ENCODER_HANDLER, new WebSocket08FrameEncoder(true)); pipeline.addBefore(AHC_WS_HANDLER, WS_DECODER_HANDLER, new WebSocket08FrameDecoder(false, false, config.getWebSocketMaxFrameSize())); pipeline.addAfter(WS_DECODER_HANDLER, WS_FRAME_AGGREGATOR, new WebSocketFrameAggregator(config.getWebSocketMaxBufferSize())); pipeline.remove(HTTP_CLIENT_CODEC); }
@Override protected void doStart() { fiber.start(); fiber.execute(() -> { // we need the tablet module: ListenableFuture<C5Module> f = server.getModule(ModuleType.Tablet); Futures.addCallback(f, new FutureCallback<C5Module>() { @Override public void onSuccess(final C5Module result) { tabletModule = (TabletModule) result; bootstrap.group(acceptGroup, workerGroup) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("http-server-codec", new HttpServerCodec()); p.addLast("http-agg", new HttpObjectAggregator(C5ServerConstants.MAX_CALL_SIZE)); p.addLast("websocket-agg", new WebSocketFrameAggregator(C5ServerConstants.MAX_CALL_SIZE)); p.addLast("decoder", new WebsocketProtostuffDecoder("/websocket")); p.addLast("encoder", new WebsocketProtostuffEncoder()); p.addLast("handler", new RegionServerHandler(RegionServerService.this)); } } ); bootstrap.bind(port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { listenChannel = future.channel(); notifyStarted(); } else { LOG.error("Unable to find Region Server to {} {}", port, future.cause()); notifyFailed(future.cause()); } } }); } @Override public void onFailure(Throwable t) { notifyFailed(t); } }, fiber); }); }
/** * Turn this {@link WebsocketInbound} into aggregating mode which will only produce * fully formed frame that have been received fragmented. * * @param maxContentLength the maximum frame length * * @return this inbound */ default WebsocketInbound aggregateFrames(int maxContentLength) { context().addHandlerLast(new WebSocketFrameAggregator(maxContentLength)); return this; }