一尘不染

Netty IllegalReferenceCountException

java

尽管我的业务逻辑没有问题,但事实证明我没有使用Netty
ByteBuf。更新要使用的测试代码后ByteBuf,我遇到了IllegalReferenceCountException的无尽循环。我承认对Netty还是陌生的,但这并不能证明在手动分配和释放资源的日子里回来。创建GC就是为了避免这种混乱。迪斯科,有人吗?那贝尔底呢?

public class StringDecoder extends AbstractDecoder<String> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';

    @Override
    public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        return Flux.from(publisher)
            .scan(Tuples.<Flux<DataBuffer>, Optional<DataBuffer>>of(Flux.empty(), Optional.empty()),
                    (acc, buffer) -> {
                        List<DataBuffer> results = new ArrayList<>();
                        int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount();
                        Optional<DataBuffer> incomplete = acc.getT2();

                        while (startIdx < limit && endIdx != -1) {
                            endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx);
                            int length = (endIdx == -1 ? limit : endIdx) - startIdx;
                            DataBuffer slice = buffer.slice(startIdx, length);
                            DataBuffer tmp = incomplete.map(b -> b.write(slice))
                                    .orElse(buffer.factory().allocateBuffer(length).write(slice));
                            tmp = DataBufferUtils.retain(tmp);

                            if (endIdx != -1) {
                                startIdx = endIdx + 1;
                                results.add(tmp);
                                incomplete = Optional.empty();
                            } else {
                                incomplete = Optional.of(tmp);
                            }
                        }
                        releaseBuffer(buffer);

                        return Tuples.of(Flux.fromIterable(results), incomplete);
                    })
            .flatMap(t -> {
                t.getT2().ifPresent(this::releaseBuffer);
                return t.getT1();
            })
            .map(buffer -> {
                // charset resolution should in general use supplied mimeType
                String s = UTF_8.decode(buffer.asByteBuffer()).toString();
                releaseBuffer(buffer);

                return s;
            })
            .log();
    }

    private void releaseBuffer(DataBuffer buffer) {
        boolean release = DataBufferUtils.release(buffer);
        if (release) {
            System.out.println("Buffer was released.");
        }
    }
}

public class StringDecoderTest {
    private StringDecoder stringDecoder = new StringDecoder();
    DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT);

    @Test
    public void testDecode() {
        Flux<DataBuffer> pub = Flux.just("abc\n", "abc", "def\n", "abc", "def\nxyz\n", "abc", "def", "xyz\n")
                .map(s -> dataBufferFactory.wrap(s.getBytes(UTF_8)));

        StepVerifier.create(stringDecoder.decode(pub, null, null, null))
                .expectNext("abc", "abcdef", "abcdef", "xyz", "abcdefxyz")
                .verifyComplete();
    }
}

我不断得到:

[ERROR] (main) onError(io.netty.util.IllegalReferenceCountException: refCnt: 0)
[ERROR] (main)  - io.netty.util.IllegalReferenceCountException: refCnt: 0
io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1415)
    at io.netty.buffer.UnpooledHeapByteBuf.nioBuffer(UnpooledHeapByteBuf.java:314)
    at io.netty.buffer.AbstractUnpooledSlicedByteBuf.nioBuffer(AbstractUnpooledSlicedByteBuf.java:434)
    at io.netty.buffer.CompositeByteBuf.nioBuffers(CompositeByteBuf.java:1496)
    at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1468)
    at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1205)
    at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:234)
    at org.abhijitsarkar.java.StringDecoder.lambda$decode$4(StringDecoder.java:61)

阅读 243

收藏
2020-12-03

共1个答案

一尘不染

工作代码:

public class StringDecoder extends AbstractDecoder<String> {
    private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';

    @Override
    public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
        DataBuffer incomplete = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT).allocateBuffer(0);

        return Flux.from(publisher)
                .scan(Tuples.<Flux<DataBuffer>, DataBuffer>of(Flux.empty(), retain(incomplete)),
                      (acc, buffer) -> {
                          List<DataBuffer> results = new ArrayList<>();
                          int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount();

                          while (startIdx < limit && endIdx != -1) {
                              endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx);
                              int length = (endIdx == -1 ? limit : endIdx) - startIdx;

                              DataBuffer slice = buffer.slice(startIdx, length);
                              byte[] slice1 = new byte[length];
                              slice.read(slice1, 0, slice1.length);

                              if (endIdx != -1) {
                                  byte[] slice2 = new byte[incomplete.readableByteCount()];
                                  incomplete.read(slice2, 0, slice2.length);
                                  // call retain to match release during decoding to string later
                                  results.add(retain(
                                          incomplete.factory().allocateBuffer()
                                                  .write(slice2)
                                                  .write(slice1)
                                  ));
                                  startIdx = endIdx + 1;
                              } else {
                                  incomplete.write(slice1);
                              }
                          }

                          return Tuples.of(Flux.fromIterable(results), incomplete);
                      })
                .flatMap(Tuple2::getT1)
                .map(buffer -> {
                    // charset resolution should in general use supplied mimeType
                    String s = UTF_8.decode(buffer.asByteBuffer()).toString();

                    return s;
                })
                .doOnTerminate(() -> release(incomplete))
                .log();
    }
}

该代码可能更简洁一些,但是适用于Spring bug
SPR-16351

2020-12-03