@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } }
private void unwrap(ChannelHandlerContext ctx) throws SSLException { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { unwrap(ctx, Unpooled.EMPTY_BUFFER.nioBuffer(), out); final int size = out.size(); for (int i = 0; i < size; i++) { ctx.fireChannelRead(out.get(i)); } } finally { out.recycle(); } }
@Override public ByteBuffer[] nioBuffers(int index, int length) { checkIndex(index, length); if (length == 0) { return EmptyArrays.EMPTY_BYTE_BUFFERS; } RecyclableArrayList array = RecyclableArrayList.newInstance(buffers.length); try { Component c = findComponent(index); int i = c.index; int adjustment = c.offset; ByteBuf s = c.buf; for (;;) { int localLength = Math.min(length, s.capacity() - (index - adjustment)); switch (s.nioBufferCount()) { case 0: throw new UnsupportedOperationException(); case 1: array.add(s.nioBuffer(index - adjustment, localLength)); break; default: Collections.addAll(array, s.nioBuffers(index - adjustment, localLength)); } index += localLength; length -= localLength; adjustment += s.readableBytes(); if (length <= 0) { break; } s = buffer(++i); } return array.toArray(new ByteBuffer[array.size()]); } finally { array.recycle(); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data); data.release(); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0; for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } }
@Benchmark public void recycleSameThread() { RecyclableArrayList list = RecyclableArrayList.newInstance(size); list.recycle(); }
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { RecyclableArrayList out = null; try { if (acceptOutboundMessage(msg)) { out = RecyclableArrayList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } if (out.isEmpty()) { out.recycle(); out = null; throw new EncoderException( StringUtil.simpleClassName(this) + " must produce at least one message."); } } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { if (out != null) { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { ctx.write(out.get(0), promise); } else if (sizeMinusOne > 0) { // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure // See https://github.com/netty/netty/issues/2525 ChannelPromise voidPromise = ctx.voidPromise(); boolean isVoidPromise = promise == voidPromise; for (int i = 0; i < sizeMinusOne; i ++) { ChannelPromise p; if (isVoidPromise) { p = voidPromise; } else { p = ctx.newPromise(); } ctx.write(out.get(i), p); } ctx.write(out.get(sizeMinusOne), promise); } out.recycle(); } } }
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { RecyclableArrayList out = null; try { if (acceptOutboundMessage(msg)) { out = RecyclableArrayList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } if (out.isEmpty()) { out.recycle(); out = null; throw new EncoderException( StringUtil.simpleClassName(this) + " must produce at least one message."); } } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { if (out != null) { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne >= 0) { for (int i = 0; i < sizeMinusOne; i ++) { ctx.write(out.get(i)); } ctx.write(out.get(sizeMinusOne), promise); } out.recycle(); } } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { //收集读取的字节 ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data); data.release(); } //开始解码 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0; for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } }
@Override @SuppressWarnings("unchecked") public void write(final ChannelHandlerContext channelHandlerContext, final Object o, final ChannelPromise channelPromise) { RecyclableArrayList instance = null; try { if (this.acceptOutboundMessage(o)) { instance = RecyclableArrayList.newInstance(); try { this.encode(channelHandlerContext, (I) o, (List<Object>) instance); } finally { ReferenceCountUtil.release(o); } if (instance.isEmpty()) { instance.recycle(); instance = null; channelPromise.setSuccess(); } } else { channelHandlerContext.write(o, channelPromise); } } catch (EncoderException ex) { throw ex; } catch (Throwable t) { throw new EncoderException(t); } finally { if (instance != null) { final int n = instance.size() - 1; if (n == 0) { channelHandlerContext.write(instance.get(0), channelPromise); } else if (n > 0) { ChannelPromise voidPromise = channelHandlerContext.voidPromise(); boolean b = channelPromise == voidPromise; for (int i = 0; i < n; ++i) { ChannelPromise promise; if (b) { promise = voidPromise; } else { promise = channelHandlerContext.newPromise(); } channelHandlerContext.write(instance.get(i), promise); } channelHandlerContext.write(instance.get(n), channelPromise); } instance.recycle(); } } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; this.first = this.cumulation == null; if (this.first) { this.cumulation = data; } else { if (this.cumulation.writerIndex() > this.cumulation.maxCapacity() - data.readableBytes() || this.cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 this.expandCumulation(ctx, data.readableBytes()); } this.cumulation.writeBytes(data); data.release(); } this.callDecode(ctx, this.cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (this.cumulation != null && !this.cumulation.isReadable()) { this.cumulation.release(); this.cumulation = null; } int size = out.size(); this.decodeWasNull = size == 0; for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } }