@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //客户端channel已关闭则不转发了 if (!clientChannel.isOpen()) { ReferenceCountUtil.release(msg); return; } HttpProxyInterceptPipeline interceptPipeline = ((HttpProxyServerHandle) clientChannel.pipeline() .get("serverHandle")).getInterceptPipeline(); if (msg instanceof HttpResponse) { interceptPipeline.afterResponse(clientChannel, ctx.channel(), (HttpResponse) msg); } else if (msg instanceof HttpContent) { interceptPipeline.afterResponse(clientChannel, ctx.channel(), (HttpContent) msg); } else { clientChannel.writeAndFlush(msg); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) { StringUtils.Pair url = HttpUtils.parseUriIntoUrlBaseAndPath(msg.uri()); HttpRequest request = new HttpRequest(); if (url.left == null) { String requestScheme = provider.isSsl() ? "https" : "http"; String host = msg.headers().get(HttpUtils.HEADER_HOST); request.setUrlBase(requestScheme + "://" + host); } else { request.setUrlBase(url.left); } request.setUri(url.right); request.setMethod(msg.method().name()); msg.headers().forEach(h -> request.addHeader(h.getKey(), h.getValue())); QueryStringDecoder decoder = new QueryStringDecoder(url.right); decoder.parameters().forEach((k, v) -> request.putParam(k, v)); HttpContent httpContent = (HttpContent) msg; ByteBuf content = httpContent.content(); if (content.isReadable()) { byte[] bytes = new byte[content.readableBytes()]; content.readBytes(bytes); request.setBody(bytes); } writeResponse(request, ctx); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); }
@Override public void beforeRequest(Channel clientChannel, HttpContent httpContent, HttpProxyInterceptPipeline pipeline) throws Exception { if (content != null) { ByteBuf temp = httpContent.content().slice(); content.writeBytes(temp); if (httpContent instanceof LastHttpContent) { try { byte[] contentBts = new byte[content.readableBytes()]; content.readBytes(contentBts); ((HttpRequestInfo) pipeline.getHttpRequest()).setContent(contentBts); } finally { ReferenceCountUtil.release(content); content = null; //状态回归 } } } pipeline.beforeRequest(clientChannel, httpContent); }
public static void startDownTask(TaskInfo taskInfo, HttpRequest httpRequest, HttpResponse httpResponse, Channel clientChannel) { HttpHeaders httpHeaders = httpResponse.headers(); HttpDownInfo httpDownInfo = new HttpDownInfo(taskInfo, httpRequest); HttpDownServer.DOWN_CONTENT.put(taskInfo.getId(), httpDownInfo); httpHeaders.clear(); httpResponse.setStatus(HttpResponseStatus.OK); httpHeaders.set(HttpHeaderNames.CONTENT_TYPE, "text/html"); String host = HttpDownServer.isDev() ? "localhost" : ((InetSocketAddress) clientChannel.localAddress()).getHostString(); String js = "<script>window.top.location.href='http://" + host + ":" + HttpDownServer.VIEW_SERVER_PORT + "/#/tasks/new/" + httpDownInfo .getTaskInfo().getId() + "';</script>"; HttpContent content = new DefaultLastHttpContent(); content.content().writeBytes(js.getBytes()); httpHeaders.set(HttpHeaderNames.CONTENT_LENGTH, js.getBytes().length); clientChannel.writeAndFlush(httpResponse); clientChannel.writeAndFlush(content); clientChannel.close(); }
@Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; LOGGER.info("Response status: " + response.getStatus()); if (response.getStatus().equals(OK)) { LOGGER.info("Operation is successful"); } else { LOGGER.error("Operation is failed"); } } if (msg instanceof HttpContent) { HttpContent content = (HttpContent) msg; System.out.print(content.content().toString(CharsetUtil.UTF_8)); System.out.flush(); } }
protected void logResponseFirstChunk(HttpResponse response, ChannelHandlerContext ctx) { if (logger.isDebugEnabled()) { StringBuilder headers = new StringBuilder(); for (String headerName : response.headers().names()) { if (headers.length() > 0) headers.append(", "); headers.append(headerName).append("=\"") .append(String.join(",", response.headers().getAll(headerName))).append("\""); } StringBuilder sb = new StringBuilder(); sb.append("SENDING RESPONSE:"); sb.append("\n\tHTTP STATUS: ").append(response.getStatus().code()); sb.append("\n\tHEADERS: ").append(headers.toString()); sb.append("\n\tPROTOCOL: ").append(response.getProtocolVersion().text()); if (response instanceof HttpContent) { HttpContent chunk = (HttpContent) response; sb.append("\n\tCONTENT CHUNK: ").append(chunk.getClass().getName()).append(", size: ") .append(chunk.content().readableBytes()); } runnableWithTracingAndMdc(() -> logger.debug(sb.toString()), ctx).run(); } }
@Test public void write_sets_finalContentLength_if_msg_is_HttpContent_and_finalContentLength_is_null() throws Exception { // given HttpContent msgMock = mock(HttpContent.class); ByteBuf contentMock = mock(ByteBuf.class); int contentBytes = (int)(Math.random() * 10000); doReturn(contentMock).when(msgMock).content(); doReturn(contentBytes).when(contentMock).readableBytes(); assertThat(responseInfo.getFinalContentLength()).isNull(); // when handler.write(ctxMock, msgMock, promiseMock); // then assertThat(responseInfo.getFinalContentLength()).isEqualTo(contentBytes); }
@Test public void write_adds_to_finalContentLength_if_msg_is_HttpContent_and_finalContentLength_is_not_null() throws Exception { // given HttpContent msgMock = mock(HttpContent.class); ByteBuf contentMock = mock(ByteBuf.class); int contentBytes = (int)(Math.random() * 10000); doReturn(contentMock).when(msgMock).content(); doReturn(contentBytes).when(contentMock).readableBytes(); int initialFinalContentLengthValue = (int)(Math.random() * 10000); responseInfo.setFinalContentLength((long)initialFinalContentLengthValue); assertThat(responseInfo.getFinalContentLength()).isEqualTo(initialFinalContentLengthValue); // when handler.write(ctxMock, msgMock, promiseMock); // then assertThat(responseInfo.getFinalContentLength()).isEqualTo(initialFinalContentLengthValue + contentBytes); }
@Test public void write_does_nothing_to_finalContentLength_if_msg_is_HttpContent_but_state_is_null() throws Exception { // given HttpContent msgMock = mock(HttpContent.class); ByteBuf contentMock = mock(ByteBuf.class); int contentBytes = (int)(Math.random() * 10000); doReturn(contentMock).when(msgMock).content(); doReturn(contentBytes).when(contentMock).readableBytes(); doReturn(null).when(stateAttrMock).get(); assertThat(responseInfo.getFinalContentLength()).isNull(); // when handler.write(ctxMock, msgMock, promiseMock); // then assertThat(responseInfo.getFinalContentLength()).isNull(); }
@Test public void write_does_nothing_to_finalContentLength_if_msg_is_HttpContent_but_responseInfo_is_null() throws Exception { // given HttpContent msgMock = mock(HttpContent.class); ByteBuf contentMock = mock(ByteBuf.class); int contentBytes = (int)(Math.random() * 10000); doReturn(contentMock).when(msgMock).content(); doReturn(contentBytes).when(contentMock).readableBytes(); doReturn(null).when(stateMock).getResponseInfo(); assertThat(responseInfo.getFinalContentLength()).isNull(); // when handler.write(ctxMock, msgMock, promiseMock); // then assertThat(responseInfo.getFinalContentLength()).isNull(); }
@Before public void beforeMethod() { stateMock = mock(HttpProcessingState.class); ctxMock = mock(ChannelHandlerContext.class); channelMock = mock(Channel.class); stateAttrMock = mock(Attribute.class); endpointMock = mock(Endpoint.class); maxRequestSizeInBytes = 10; httpContentMock = mock(HttpContent.class); byteBufMock = mock(ByteBuf.class); requestInfo = mock(RequestInfo.class); handler = new RequestInfoSetterHandler(maxRequestSizeInBytes); doReturn(channelMock).when(ctxMock).channel(); doReturn(stateAttrMock).when(channelMock).attr(ChannelAttributes.HTTP_PROCESSING_STATE_ATTRIBUTE_KEY); doReturn(stateMock).when(stateAttrMock).get(); doReturn(endpointMock).when(stateMock).getEndpointForExecution(); doReturn(byteBufMock).when(httpContentMock).content(); doReturn(null).when(endpointMock).maxRequestSizeInBytesOverride(); doReturn(requestInfo).when(stateMock).getRequestInfo(); }
@DataProvider(value = { "0", "42" }) @Test public void contentChunksWillBeReleasedExternally_works_as_expected(int contentChunkListSize) { // given RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests(); Assertions.assertThat(requestInfo.contentChunksWillBeReleasedExternally).isFalse(); for (int i = 0; i < contentChunkListSize; i++) { requestInfo.contentChunks.add(mock(HttpContent.class)); } // when requestInfo.contentChunksWillBeReleasedExternally(); // then Assertions.assertThat(requestInfo.contentChunksWillBeReleasedExternally).isTrue(); Assertions.assertThat(requestInfo.contentChunks).isEmpty(); }
@Test public void addContentChunk_adds_chunk_content_length_to_rawContentLengthInBytes() throws IOException { // given RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests(); requestInfo.isCompleteRequestWithAllChunks = false; String chunk1String = UUID.randomUUID().toString(); String lastChunkString = UUID.randomUUID().toString(); byte[] chunk1Bytes = chunk1String.getBytes(); byte[] lastChunkBytes = lastChunkString.getBytes(); HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer(chunk1Bytes)); HttpContent lastChunk = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastChunkBytes)); // when requestInfo.addContentChunk(chunk1); requestInfo.addContentChunk(lastChunk); // then assertThat(requestInfo.contentChunks.size(), is(2)); assertThat(requestInfo.isCompleteRequestWithAllChunks(), is(true)); assertThat(requestInfo.getRawContentLengthInBytes(), is(chunk1Bytes.length + lastChunkBytes.length)); }
@Test public void releaseContentChunks_calls_release_on_each_chunk_and_calls_clear_on_chunk_list() { // given RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests(); List<HttpContent> contentChunkList = Arrays.asList(mock(HttpContent.class), mock(HttpContent.class)); requestInfo.contentChunks.addAll(contentChunkList); assertThat(requestInfo.contentChunks.size(), is(contentChunkList.size())); // when requestInfo.releaseContentChunks(); // then for (HttpContent chunkMock : contentChunkList) { verify(chunkMock).release(); } assertThat(requestInfo.contentChunks.isEmpty(), is(true)); }
@Test public void releaseContentChunks_clear_on_chunk_list_but_does_not_release_chunks_if_contentChunksWillBeReleasedExternally_is_true() { // given RequestInfoImpl<?> requestInfo = RequestInfoImpl.dummyInstanceForUnknownRequests(); requestInfo.contentChunksWillBeReleasedExternally(); List<HttpContent> contentChunkList = Arrays.asList(mock(HttpContent.class), mock(HttpContent.class)); requestInfo.contentChunks.addAll(contentChunkList); assertThat(requestInfo.contentChunks.size(), is(contentChunkList.size())); // when requestInfo.releaseContentChunks(); // then for (HttpContent chunkMock : contentChunkList) { verify(chunkMock, never()).release(); } assertThat(requestInfo.contentChunks.isEmpty(), is(true)); }
private void handleChunk(HttpContent chunk,// final Channel channel,// final NettyResponseFuture<?> future,// AsyncHandler<?> handler) throws IOException, Exception { boolean interrupt = false; boolean last = chunk instanceof LastHttpContent; // Netty 4: the last chunk is not empty if (last) { LastHttpContent lastChunk = (LastHttpContent) chunk; HttpHeaders trailingHeaders = lastChunk.trailingHeaders(); if (!trailingHeaders.isEmpty()) { interrupt = handler.onHeadersReceived(new HttpResponseHeaders(trailingHeaders, true)) != State.CONTINUE; } } ByteBuf buf = chunk.content(); if (!interrupt && !(handler instanceof StreamedAsyncHandler) && (buf.readableBytes() > 0 || last)) { HttpResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(buf, last); interrupt = updateBodyAndInterrupt(future, handler, part); } if (interrupt || last) finishUpdate(future, channel, !last); }
@Override public HttpResponse clientToProxyRequest(HttpObject httpObject) { if (httpObject instanceof HttpRequest) { this.httpRequest = (HttpRequest) httpObject; } if (httpObject instanceof HttpContent) { HttpContent httpContent = (HttpContent) httpObject; storeRequestContent(httpContent); if (httpContent instanceof LastHttpContent) { LastHttpContent lastHttpContent = (LastHttpContent) httpContent; trailingHeaders = lastHttpContent .trailingHeaders(); } } return null; }
@Override public HttpObject serverToProxyResponse(HttpObject httpObject) { if (httpObject instanceof HttpResponse) { httpResponse = (HttpResponse) httpObject; captureContentEncoding(httpResponse); } if (httpObject instanceof HttpContent) { HttpContent httpContent = (HttpContent) httpObject; storeResponseContent(httpContent); if (httpContent instanceof LastHttpContent) { LastHttpContent lastContent = (LastHttpContent) httpContent; captureTrailingHeaders(lastContent); captureFullResponseContents(); } } return super.serverToProxyResponse(httpObject); }
@Override protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpContent) { if (msg != LastHttpContent.EMPTY_LAST_CONTENT) { super.onInboundNext(ctx, msg); } if (msg instanceof LastHttpContent) { onInboundComplete(); if (isOutboundDone()) { onHandlerTerminate(); } else { //force auto read to enable more accurate close selection now inbound is done channel().config() .setAutoRead(true); } } } else { super.onInboundNext(ctx, msg); } }
private void handleUploadMessage(HttpMessage httpMsg, Message uploadMessage) throws IOException{ if (httpMsg instanceof HttpContent) { HttpContent chunk = (HttpContent) httpMsg; decoder.offer(chunk); try { while (decoder.hasNext()) { InterfaceHttpData data = decoder.next(); if (data != null) { try { handleUploadFile(data, uploadMessage); } finally { data.release(); } } } } catch (EndOfDataDecoderException e1) { //ignore } if (chunk instanceof LastHttpContent) { resetUpload(); } } }
@Test public void testBuildContent() throws Exception { HttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "www.google.com"); RecordedHttpRequestBuilder recordedHttpRequestBuilder = new RecordedHttpRequestBuilder(nettyRequest); String charset = "UTF-8"; String str1 = "first content"; HttpContent httpContent1 = new DefaultHttpContent(Unpooled.copiedBuffer(str1.getBytes(charset))); recordedHttpRequestBuilder.appendHttpContent(httpContent1); String str2 = "second content"; HttpContent httpContent2 = new DefaultHttpContent(Unpooled.copiedBuffer(str2.getBytes(charset))); recordedHttpRequestBuilder.appendHttpContent(httpContent2); String lastStr = "Last chunk"; HttpContent lastContent = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastStr.getBytes(charset))); recordedHttpRequestBuilder.appendHttpContent(lastContent); RecordedHttpRequest recordedHttpRequest = recordedHttpRequestBuilder.build(); Assert .assertEquals((str1 + str2 + lastStr).getBytes(charset), recordedHttpRequest.getHttpBody().getContent(charset)); }
@Test public void testBuild() throws IOException { HttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.GATEWAY_TIMEOUT); RecordedHttpResponseBuilder recordedHttpResponseBuilder = new RecordedHttpResponseBuilder(httpResponse); String charset = "UTF-8"; String str1 = "Hello world"; HttpContent httpContent1 = new DefaultHttpContent(Unpooled.copiedBuffer(str1.getBytes(charset))); recordedHttpResponseBuilder.appendHttpContent(httpContent1); String str2 = "second content"; HttpContent httpContent2 = new DefaultHttpContent(Unpooled.copiedBuffer(str2.getBytes(charset))); recordedHttpResponseBuilder.appendHttpContent(httpContent2); String lastStr = "Last chunk"; HttpContent lastContent = new DefaultLastHttpContent(Unpooled.copiedBuffer(lastStr.getBytes(charset))); recordedHttpResponseBuilder.appendHttpContent(lastContent); RecordedHttpResponse recordedHttpResponse = recordedHttpResponseBuilder.build(); Assert.assertEquals(recordedHttpResponse.getStatus(), HttpResponseStatus.GATEWAY_TIMEOUT.code()); Assert.assertEquals((str1 + str2 + lastStr).getBytes(charset), recordedHttpResponse.getHttpBody().getContent(charset)); }
@Override public void handleReadFromClient(ChannelMediator channelMediator, HttpObject httpObject) { if (channelMediator == null) { throw new IllegalStateException("HRFC: ChannelMediator can't be null"); } try { if (httpObject instanceof HttpRequest) { HttpRequest httpRequest = (HttpRequest) httpObject; _clientRequestBuilder.interpretHttpRequest(httpRequest); _clientRequestBuilder.addHeaders(httpRequest); } if (httpObject instanceof HttpContent) { _clientRequestBuilder.appendHttpContent((HttpContent) httpObject); } if (httpObject instanceof LastHttpContent) { HttpResponse httpResponse = playBack(); channelMediator.writeToClientAndDisconnect(httpResponse); } } catch (IOException e) { throw new RuntimeException("HRFC: Failed to replay HttpContent", e); } }
@Override public void handleReadFromClient(ChannelMediator channelMediator, HttpObject httpObject) { if (channelMediator == null) { throw new IllegalStateException("HRFC: ChannelMediator can't be null"); } try { if (httpObject instanceof HttpRequest) { HttpRequest httpRequest = (HttpRequest) httpObject; _clientRequestBuilder.interpretHttpRequest(httpRequest); _clientRequestBuilder.addHeaders(httpRequest); } if (httpObject instanceof HttpContent) { _clientRequestBuilder.appendHttpContent((HttpContent) httpObject); } } catch (IOException e) { throw new RuntimeException("HRFC: Failed to record HttpContent", e); } channelMediator.writeToServer(httpObject); }
@Override public void handleReadFromServer(HttpObject httpObject) { if (httpObject instanceof HttpResponse) { _serverResponseBuilder = new RecordedHttpResponseBuilder((HttpResponse) httpObject); } try { if (httpObject instanceof HttpContent) { _serverResponseBuilder.appendHttpContent((HttpContent) httpObject); } if (httpObject instanceof LastHttpContent) { _sceneAccessLayer.record(_clientRequestBuilder.build(), _serverResponseBuilder.build()); } } catch (IOException e) { throw new RuntimeException("HRFS: Failed to record HttpContent", e); } }
@Override public void onRead(HttpObject httpObject) { if (!_connectionFlowProcessor.isComplete()) { _channelReadCallback.write(httpObject); // Accroding to http://netty.io/wiki/reference-counted-objects.html // When an event loop reads data into a ByteBuf and triggers a channelRead() event with it, // it is the responsibility of the ChannelHandler in the corresponding pipeline to release the buffer. // Since this is the last ChannelHandler, it release the reference-counted after read. So we need to // retain to make sure it will not be released until we stored in scene. if(httpObject instanceof HttpContent){ ((HttpContent)httpObject).retain(); } return; } _channelMediator.readFromClientChannel(httpObject); }
public NettyHttpOperationFailedException(String uri, int statusCode, String statusText, String location, HttpContent content) { super("Netty HTTP operation failed invoking " + uri + " with statusCode: " + statusCode + (location != null ? ", redirectLocation: " + location : "")); this.uri = uri; this.statusCode = statusCode; this.statusText = statusText; this.redirectLocation = location; this.content = content; String str = ""; try { str = NettyConverter.toString(content.content(), null); } catch (UnsupportedEncodingException e) { // ignore } this.contentAsString = str; }
@Override protected void channelRead0( ChannelHandlerContext context, HttpObject message) throws Exception { if (message instanceof HttpResponse) { receive((HttpResponse) message); } if (message instanceof HttpContent) { receive((HttpContent) message); if (message instanceof LastHttpContent) { release(this); } } }
void receive(HttpContent content) { // Consume the response body. ByteBuf byteBuf = content.content(); for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) { byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead)); total += toRead; } if (VERBOSE && content instanceof LastHttpContent) { long finish = System.nanoTime(); System.out.println(String.format("Transferred % 8d bytes in %4d ms", total, TimeUnit.NANOSECONDS.toMillis(finish - start))); } }
public void beforeRequest(Channel clientChannel, HttpContent httpContent) throws Exception { if (this.pos2 < intercepts.size()) { HttpProxyIntercept intercept = intercepts.get(this.pos2++); intercept.beforeRequest(clientChannel, httpContent, this); } this.pos2 = 0; }
public void afterResponse(Channel clientChannel, Channel proxyChannel, HttpContent httpContent) throws Exception { if (this.pos4 < intercepts.size()) { HttpProxyIntercept intercept = intercepts.get(this.pos4++); intercept.afterResponse(clientChannel, proxyChannel, httpContent, this); } this.pos4 = 0; }
@Override public void beforeRequest(Channel clientChannel, HttpContent httpContent, HttpProxyInterceptPipeline pipeline) throws Exception { if (!crtFlag) { pipeline.beforeRequest(clientChannel, httpContent); } }
public static Map<String,Object> loadPostReqParams(HttpContent content){ Map<String,Object> params =null; try { Gson gson = new Gson(); Type paraMap = new TypeToken<Map<String, JsonElement>>(){}.getType(); ByteBufInputStream in = new ByteBufInputStream(content.content()); String rawJson = IOUtils.readAll(in); params = gson.fromJson(rawJson,paraMap); } catch (IOException e) { e.printStackTrace(); } return params; }
/** * Reads the header part of response from remote HTTP server. Tests * the validity of this connection. * * @param ctx handler context of this channel * @param msg received message */ public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; HttpResponseStatus status = response.getStatus(); try { if (HTTPResponseCode.isMoved(status)) { URI uri = URI.create( response.headers().get(HttpHeaders.Names.LOCATION)); builder.setUri(URI.create(uri.endpoint())); code = ActionCode.Redirect; throw new HTTPException(builder.getHost() + " " + status.toString()); } else if (HTTPResponseCode.isNotFound(status)) { code = ActionCode.NotFound; throw new HTTPException(builder.getHost() + " " + status.toString()); } else if (HTTPResponseCode.isInvalid(status)) { code = ActionCode.Bad; throw new HTTPException( builder.getHost() + " HEADER method unsupported"); } else if (HTTPResponseCode.isOK(status)) try { // Valid HTTP server found code = ActionCode.OK; builder.setKeepAlive(!response.headers() .get("Connection").equalsIgnoreCase("close")); } catch (NullPointerException npe) { // No connection header. builder.setKeepAlive(true); } } catch (HTTPException e) { System.err.println(e.getMessage()); } } if (msg instanceof HttpContent) { endTest(ctx); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent chunk) throws IOException { chunk.content().readBytes(out, chunk.content().readableBytes()); if (chunk instanceof LastHttpContent) { response.headers().set(CONNECTION, CLOSE); ctx.write(response).addListener(ChannelFutureListener.CLOSE); releaseDfsResources(); } }
@Override public void subscribe(Subscriber<? super HttpContent> subscriber) { publisher.subscribe(new Subscriber<ByteBuffer>() { @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onNext(ByteBuffer byteBuffer) { ByteBuf buffer = channel.alloc().buffer(byteBuffer.remaining()); buffer.writeBytes(byteBuffer); HttpContent content = new DefaultHttpContent(buffer); subscriber.onNext(content); } @Override public void onError(Throwable t) { subscriber.onError(t); } @Override public void onComplete() { subscriber.onComplete(); } }); }
@Override public void subscribe(Subscriber<? super ByteBuffer> subscriber) { response.subscribe(new Subscriber<HttpContent>() { @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onNext(HttpContent httpContent) { // Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous ByteBuffer b = copyToByteBuffer(httpContent.content()); httpContent.release(); subscriber.onNext(b); channelContext.read(); } @Override public void onError(Throwable t) { runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()), () -> subscriber.onError(t)); requestContext.handler().exceptionOccurred(t); } @Override public void onComplete() { try { runAndLogError(String.format("Subscriber %s threw an exception in onComplete.", subscriber.toString()), subscriber::onComplete); requestContext.handler().complete(); } finally { finalizeRequest(requestContext, channelContext); } } }); }