/** * 适配 */ @Override protected ChannelHandler fixHandlerBeforeConnect(final ChannelHandler handler) { ChannelHandler result=new ShareableChannelInboundHandler() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { Channel ch=ctx.channel(); ch.pipeline().addLast(new HttpClientCodec()); ch.pipeline().addLast(new HttpObjectAggregator(64*1024)); ch.pipeline().addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))); ch.pipeline().addLast(new WebSocketConnectedClientHandler(handler)); ctx.pipeline().remove(this);//移除当前handler ctx.pipeline().fireChannelRegistered();//重新从第一个handler抛出事件 } }; // ChannelInitializer<SocketChannel> result=new ChannelInitializer<SocketChannel>() { // @Override // protected void initChannel(SocketChannel ch) { // ch.pipeline().addLast(new HttpClientCodec()); // ch.pipeline().addLast(new HttpObjectAggregator(64*1024)); // ch.pipeline().addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))); // ch.pipeline().addLast(new WebSocketConnectedClientHandler(handler)); // } // }; return result; }
/** * 适配 */ @Override protected ChannelHandler fixHandlerBeforeConnect(final ChannelHandler handler) { ChannelHandler result=new ShareableChannelInboundHandler() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { Channel ch=ctx.channel(); ch.pipeline().addLast(new HttpClientCodec()); ch.pipeline().addLast(new HttpObjectAggregator(64*1024)); ch.pipeline().addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))); ch.pipeline().addLast(new WebSocketConnectedClientHandler(handler)); ctx.pipeline().remove(this);//移除当前handler ctx.fireChannelRegistered();//重新从第一个handler抛出事件 } }; // ChannelInitializer<SocketChannel> result=new ChannelInitializer<SocketChannel>() { // @Override // protected void initChannel(SocketChannel ch) { // ch.pipeline().addLast(new HttpClientCodec()); // ch.pipeline().addLast(new HttpObjectAggregator(64*1024)); // ch.pipeline().addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))); // ch.pipeline().addLast(new WebSocketConnectedClientHandler(handler)); // } // }; return result; }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { // Handshake is completed String actualProtocol = handshaker.actualSubprotocol(); serialization = WampSerialization.fromString(actualProtocol); if (serialization == WampSerialization.Invalid) { throw new WampError("Invalid Websocket Protocol"); } // Install the serializer and deserializer ctx.pipeline() .addAfter(ctx.name(), "wamp-deserializer", new WampDeserializationHandler(serialization)); ctx.pipeline() .addAfter(ctx.name(), "wamp-serializer", new WampSerializationHandler(serialization)); // Fire the connection established event ctx.fireUserEventTriggered(new ConnectionEstablishedEvent(serialization)); } else { ctx.fireUserEventTriggered(evt); } }
/** * 通道注册的时候配置websocket解码handler */ @Override protected final void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline=ch.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64*1024)); pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI(url), WebSocketVersion.V13, null, false, new DefaultHttpHeaders()))); pipeline.addLast(new WebSocketConnectedClientHandler());//连接成功监听handler }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { log.debug("excute webSocketHandComplete……"); webSocketHandComplete(ctx); ctx.pipeline().remove(this); log.debug("excuted webSocketHandComplete:"+ctx.pipeline().toMap().toString()); }else { super.userEventTriggered(ctx, evt); } }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { log.log(logLevel,"WebSocket:HANDSHAKE_COMPLETE,pipeline:"+ctx.channel().pipeline().toMap().toString()); ctx.pipeline().addLast(new WebSocketTextFrameByteBufAdapter());//适配器 ctx.pipeline().addLast(this.handler);//业务层handler //为新加的handler手动触发必要事件 ctx.fireChannelRegistered(); ctx.fireChannelActive(); log.log(logLevel,"HANDSHAKE_COMPLETED HANDLERS:"+ctx.channel().pipeline().toMap().toString()); } super.userEventTriggered(ctx, evt); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { log.log(logLevel,"WebSocket:HANDSHAKE_COMPLETE,pipeline:"+ctx.channel().pipeline().toMap().toString()); ctx.pipeline().addLast(new WebSocketBinaryFrameByteBufAdapter());//适配器 ctx.pipeline().addLast(this.handler);//业务层handler //为新加的handler手动触发必要事件 ctx.fireChannelRegistered(); ctx.fireChannelActive(); log.log(logLevel,"HANDSHAKE_COMPLETED HANDLERS:"+ctx.channel().pipeline().toMap().toString()); } super.userEventTriggered(ctx, evt); }
@Override protected void initChannel(final NioSocketChannel ch) throws Exception { URI uri = new URI("ws", null, host, port, "/game/" + (username == null ? "" : uriEncode(username)), null, null); System.out.println(uri); notificationChannel = ch; ch.pipeline().addLast( new HttpClientCodec(), new HttpObjectAggregator(65536), new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders(), 65536, true), new MessageCodec(), new NotificationHandler()); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); Logger.debug("ue: " + evt); if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { Logger.debug("handshake done"); //this.handshaker = null; this.wsready.release(); } }
@Override public void addToPipeline(final ChannelPipeline pipeline) { pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); final WebSocketClientHandshaker handShaker = new WhiteSpaceInPathWebSocketClientHandshaker13(serverUri, WebSocketVersion.V13, PROTOCOL, false, createHttpHeaders(httpHeaders), Integer.MAX_VALUE); pipeline.addLast("websocket-protocol-handler", new WebSocketClientProtocolHandler(handShaker)); pipeline.addLast("websocket-frame-codec", new ByteBufToWebSocketFrameCodec()); }