private void failAndRespond(Throwable cause) { fail(cause); final Channel ch =; final Http2Error error; if (response.isOpen()) { response.close(cause); error = Http2Error.INTERNAL_ERROR; } else if (cause instanceof WriteTimeoutException || cause instanceof AbortedStreamException) { error = Http2Error.CANCEL; } else { Exceptions.logIfUnexpected(logger, ch, HttpSession.get(ch).protocol(), "a request publisher raised an exception", cause); error = Http2Error.INTERNAL_ERROR; } if (ch.isActive()) { encoder.writeReset(ctx, id, streamId(), error); ctx.flush(); } }
@Override public void onComplete() { if (!cancelTimeout() && reqCtx.requestTimeoutHandler() == null) { // We have already returned a failed response due to a timeout. return; } if (wroteNothing(state)) { logger.warn("{} Published nothing (or only informational responses): {}",, service()); responseEncoder.writeReset(ctx,, req.streamId(), Http2Error.INTERNAL_ERROR); return; } if (state != State.DONE) { write(HttpData.EMPTY_DATA, true, true); } }
@Override protected ChannelFuture doWriteReset(ChannelHandlerContext ctx, int id, int streamId, Http2Error error) { // NB: this.minClosedId can be overwritten more than once when 3+ pipelined requests are received // and they are handled by different threads simultaneously. // e.g. when the 3rd request triggers a reset and then the 2nd one triggers another. minClosedId = Math.min(minClosedId, id); for (int i = minClosedId; i <= maxIdWithPendingWrites; i++) { final PendingWrites pendingWrites = pendingWritesMap.remove(i); for (;;) { final Entry<HttpObject, ChannelPromise> e = pendingWrites.poll(); if (e == null) { break; } e.getValue().tryFailure(ClosedSessionException.get()); } } final ChannelFuture f = ctx.write(Unpooled.EMPTY_BUFFER); if (currentId >= minClosedId) { f.addListener(ChannelFutureListener.CLOSE); } return f; }
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise) throws Exception { close(ctx, promise); connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyServerStream.TransportState serverStream = serverStream(stream); if (serverStream != null) { serverStream.transportReportStatus(msg.getStatus()); resetStream(ctx,, Http2Error.CANCEL.code(), ctx.newPromise()); } stream.close(); return true; } }); }
@Override public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { if (keepAliveManager != null) { keepAliveManager.onDataReceived(); } if (!keepAliveEnforcer.pingAcceptable()) { ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings"); goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(), debugData, ctx.newPromise()); Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"); try { forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise()); } catch (Exception ex) { onError(ctx, ex); } } }
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise) throws Exception { // close() already called by NettyClientTransport, so just need to clean up streams connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); if (clientStream != null) { clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); resetStream(ctx,, Http2Error.CANCEL.code(), ctx.newPromise()); } stream.close(); return true; } }); }
@Test public void keepAliveEnforcer_enforcesPings() throws Exception { permitKeepAliveWithoutCalls = false; permitKeepAliveTimeInNanos = TimeUnit.HOURS.toNanos(1); manualSetUp(); ByteBuf payload = handler().ctx().alloc().buffer(8); payload.writeLong(1); for (int i = 0; i < KeepAliveEnforcer.MAX_PING_STRIKES + 1; i++) { channelRead(pingFrame(false /* isAck */, payload.slice())); } payload.release(); verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class)); assertFalse(channel().isActive()); }
@Test public void keepAliveEnforcer_sendingDataResetsCounters() throws Exception { permitKeepAliveWithoutCalls = false; permitKeepAliveTimeInNanos = TimeUnit.HOURS.toNanos(1); manualSetUp(); createStream(); Http2Headers headers = Utils.convertServerHeaders(new Metadata()); ChannelFuture future = enqueue( SendResponseHeadersCommand.createHeaders(stream.transportState(), headers)); future.get(); ByteBuf payload = handler().ctx().alloc().buffer(8); payload.writeLong(1); for (int i = 0; i < 10; i++) { future = enqueue( new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false)); future.get(); channel().releaseOutbound(); channelRead(pingFrame(false /* isAck */, payload.slice())); } payload.release(); verifyWrite(never()).writeGoAway(eq(ctx()), eq(STREAM_ID), eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class)); }
@Test public void keepAliveEnforcer_initialIdle() throws Exception { permitKeepAliveWithoutCalls = false; permitKeepAliveTimeInNanos = 0; manualSetUp(); ByteBuf payload = handler().ctx().alloc().buffer(8); payload.writeLong(1); for (int i = 0; i < KeepAliveEnforcer.MAX_PING_STRIKES + 1; i++) { channelRead(pingFrame(false /* isAck */, payload.slice())); } payload.release(); verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class)); assertFalse(channel().isActive()); }
@Test public void keepAliveEnforcer_noticesActive() throws Exception { permitKeepAliveWithoutCalls = false; permitKeepAliveTimeInNanos = 0; manualSetUp(); createStream(); ByteBuf payload = handler().ctx().alloc().buffer(8); payload.writeLong(1); for (int i = 0; i < 10; i++) { channelRead(pingFrame(false /* isAck */, payload.slice())); } payload.release(); verifyWrite(never()).writeGoAway(eq(ctx()), eq(STREAM_ID), eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class)); }
@Test public void keepAliveEnforcer_noticesInactive() throws Exception { permitKeepAliveWithoutCalls = false; permitKeepAliveTimeInNanos = 0; manualSetUp(); createStream(); channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); ByteBuf payload = handler().ctx().alloc().buffer(8); payload.writeLong(1); for (int i = 0; i < KeepAliveEnforcer.MAX_PING_STRIKES + 1; i++) { channelRead(pingFrame(false /* isAck */, payload.slice())); } payload.release(); verifyWrite().writeGoAway(eq(ctx()), eq(STREAM_ID), eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class)); assertFalse(channel().isActive()); }
@Test public void maxConnectionIdle_goAwaySent() throws Exception { maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); assertTrue(channel().isOpen()); fakeClock().forwardNanos(maxConnectionIdleInNanos); // GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); }
@Test public void maxConnectionIdle_activeThenRst() throws Exception { maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); createStream(); fakeClock().forwardNanos(maxConnectionIdleInNanos); // GO_AWAY not sent when active verifyWrite(never()).writeGoAway( any(ChannelHandlerContext.class), any(Integer.class), any(Long.class), any(ByteBuf.class), any(ChannelPromise.class)); assertTrue(channel().isOpen()); channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); fakeClock().forwardNanos(maxConnectionIdleInNanos); // GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); }
@Test public void maxConnectionAge_goAwaySent() throws Exception { maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); assertTrue(channel().isOpen()); fakeClock().forwardNanos(maxConnectionAgeInNanos); // GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); }
@Test public void maxConnectionAgeGrace_channelStillOpenDuringGracePeriod() throws Exception { maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); manualSetUp(); createStream(); fakeClock().forwardNanos(maxConnectionAgeInNanos); verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); fakeClock().forwardTime(20, TimeUnit.MINUTES); // channel not closed yet assertTrue(channel().isOpen()); }
@Test public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception { maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); manualSetUp(); createStream(); fakeClock().forwardNanos(maxConnectionAgeInNanos); verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); assertTrue(channel().isOpen()); fakeClock().forwardNanos(maxConnectionAgeGraceInNanos); // channel closed assertTrue(!channel().isOpen()); }
@Override HttpResponseWrapper addResponse( int id, HttpRequest req, DecodedHttpResponse res, RequestLogBuilder logBuilder, long responseTimeoutMillis, long maxContentLength) { final HttpResponseWrapper resWrapper = super.addResponse(id, req, res, logBuilder, responseTimeoutMillis, maxContentLength); resWrapper.completionFuture().whenCompleteAsync((unused, cause) -> { if (cause != null) { // Ensure that the resWrapper is closed. // This is needed in case the response is aborted by the client. resWrapper.close(cause); // We are not closing the connection but just send a RST_STREAM, // so we have to remove the response manually. removeResponse(id); // Reset the stream. final int streamId = idToStreamId(id); if (conn.streamMayHaveExisted(streamId)) { final ChannelHandlerContext ctx = channel().pipeline().lastContext(); encoder.writeRstStream(ctx, streamId, Http2Error.CANCEL.code(), ctx.newPromise()); ctx.flush(); } } else { // Ensure that the resWrapper is closed. // This is needed in case the response is aborted by the client. resWrapper.close(); } }, channel().eventLoop()); return resWrapper; }
private void onTimeout() { if (state != State.DONE) { reqCtx.setTimedOut(); Runnable requestTimeoutHandler = reqCtx.requestTimeoutHandler(); if (requestTimeoutHandler != null) {; } else { failAndRespond(RequestTimeoutException.get(), SERVICE_UNAVAILABLE_MESSAGE, Http2Error.INTERNAL_ERROR); } } }
@Override public void onError(Throwable cause) { if (cause instanceof HttpResponseException) { // Timeout may occur when the aggregation of the error response takes long. // If timeout occurs, respond with 503 Service Unavailable. ((HttpResponseException) cause).httpResponse() .aggregate(ctx.executor()) .whenCompleteAsync((message, throwable) -> { if (throwable != null) { failAndRespond(throwable, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.CANCEL); } else { failAndRespond(cause, message, Http2Error.CANCEL); } }, ctx.executor()); } else if (cause instanceof HttpStatusException) { failAndRespond(cause, AggregatedHttpMessage.of(((HttpStatusException) cause).httpStatus()), Http2Error.CANCEL); } else { logger.warn("{} Unexpected exception from a service or a response publisher: {}",, service(), cause); failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR); } }
private void failAndRespond(Throwable cause, AggregatedHttpMessage message, Http2Error error) { final HttpHeaders headers = message.headers(); final HttpData content = message.content(); logBuilder().responseHeaders(headers); logBuilder().increaseResponseLength(content.length()); final State state = this.state; // Keep the state before calling fail() because it updates state. setDone(); subscription.cancel(); final int id =; final int streamId = req.streamId(); final ChannelFuture future; if (wroteNothing(state)) { // Did not write anything yet; we can send an error response instead of resetting the stream. if (content.isEmpty()) { future = responseEncoder.writeHeaders(ctx, id, streamId, headers, true); } else { responseEncoder.writeHeaders(ctx, id, streamId, headers, false); future = responseEncoder.writeData(ctx, id, streamId, content, true); } } else { // Wrote something already; we have to reset/cancel the stream. future = responseEncoder.writeReset(ctx, id, streamId, error); } if (state != State.DONE) { future.addListener(unused -> { // Write an access log always with a cause. Respect the first specified cause. if (tryComplete()) { logBuilder().endResponse(cause); accessLogWriter.accept(reqCtx.log()); } }); } ctx.flush(); }
@Override public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { final HttpRequestWriter req = requests.get(streamId); if (req == null) { throw connectionError(PROTOCOL_ERROR, "received a RST_STREAM frame for an unknown stream: %d", streamId); } req.close(streamError( streamId, Http2Error.valueOf(errorCode), "received a RST_STREAM frame")); }
private void onGoAway(String sentOrReceived, int lastStreamId, long errorCode, ByteBuf debugData) { if (errorCode != Http2Error.NO_ERROR.code()) { if (logger.isWarnEnabled()) { logger.warn("{} {} a GOAWAY frame: lastStreamId={}, errorCode={}, debugData=\"{}\"", ch, sentOrReceived, lastStreamId, errorStr(errorCode), debugData.toString(StandardCharsets.UTF_8)); } } else { if (logger.isDebugEnabled()) { logger.debug("{} {} a GOAWAY frame: lastStreamId={}, errorCode=NO_ERROR", ch, sentOrReceived, lastStreamId); } } }
/** * Resets the specified stream. If the session protocol does not support multiplexing or the connection * is in unrecoverable state, the connection will be closed. For example, in an HTTP/1 connection, this * will lead the connection to be closed immediately or after the previous requests that are not reset. */ public final ChannelFuture writeReset(ChannelHandlerContext ctx, int id, int streamId, Http2Error error) { if (closed) { return newFailedFuture(ctx); } return doWriteReset(ctx, id, streamId, error); }
@Override protected ChannelFuture doWriteReset(ChannelHandlerContext ctx, int id, int streamId, Http2Error error) { final ChannelFuture future = validateStream(ctx, streamId); if (future != null) { return future; } return encoder.writeRstStream(ctx, streamId, error.code(), ctx.newPromise()); }
private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise) { // Notify the listener if we haven't already.; // Terminate the stream. encoder().writeRstStream(ctx,, Http2Error.CANCEL.code(), promise); }
/** * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream. */ private void onRstStreamRead(int streamId, long errorCode) { NettyClientStream.TransportState stream = clientStream(connection().stream(streamId)); if (stream != null) { Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) .augmentDescription("Received Rst Stream"); stream.transportReportStatus(status, false /*stop delivery*/, new Metadata()); if (keepAliveManager != null) { keepAliveManager.onDataReceived(); } } }
/** * Cancels this stream. */ private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream =; stream.transportReportStatus(cmd.reason(), true, new Metadata()); encoder().writeRstStream(ctx,, Http2Error.CANCEL.code(), promise); }
private Status statusFromGoAway(long errorCode, byte[] debugData) { Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) .augmentDescription("Received Goaway"); if (debugData != null && debugData.length > 0) { // If a debug message was provided, use it. String msg = new String(debugData, UTF_8); status = status.augmentDescription(msg); } return status; }
@Test public void clientCancelShouldForwardToStreamListener() throws Exception { manualSetUp(); createStream(); channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); ArgumentCaptor<Status> statusCap = ArgumentCaptor.forClass(Status.class); verify(streamListener).closed(statusCap.capture()); assertEquals(Code.CANCELLED, statusCap.getValue().getCode()); Truth.assertThat(statusCap.getValue().getDescription()).contains("RST_STREAM"); verify(streamListener, atLeastOnce()).onReady(); assertNull("no messages expected", streamListenerMessageQueue.poll()); }
@Test public void closeShouldCloseChannel() throws Exception { manualSetUp(); handler().close(ctx(), newPromise()); verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), eq(Unpooled.EMPTY_BUFFER), any(ChannelPromise.class)); // Verify that the channel was closed. assertFalse(channel().isOpen()); }
@Test public void cancelShouldSendRstStream() throws Exception { manualSetUp(); createStream(); enqueue(new CancelServerStreamCommand(stream.transportState(), Status.DEADLINE_EXCEEDED)); verifyWrite().writeRstStream(eq(ctx()), eq(stream.transportState().id()), eq(Http2Error.CANCEL.code()), any(ChannelPromise.class)); }
@Test public void keepAliveManagerOnDataReceived_rstStreamRead() throws Exception { manualSetUp(); createStream(); verify(spyKeepAliveManager).onDataReceived(); // received headers channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); verify(spyKeepAliveManager, times(2)).onDataReceived(); verify(spyKeepAliveManager, never()).onTransportTermination(); }
@Test public void cancelShouldSucceed() throws Exception { createStream(); cancelStream(Status.CANCELLED); verifyWrite().writeRstStream(eq(ctx()), eq(3), eq(Http2Error.CANCEL.code()), any(ChannelPromise.class)); verify(mockKeepAliveManager, times(1)).onTransportActive(); // onStreamActive verify(mockKeepAliveManager, times(1)).onTransportIdle(); // onStreamClosed verifyNoMoreInteractions(mockKeepAliveManager); }
@Test public void cancelDeadlineExceededShouldSucceed() throws Exception { createStream(); cancelStream(Status.DEADLINE_EXCEEDED); verifyWrite().writeRstStream(eq(ctx()), eq(3), eq(Http2Error.CANCEL.code()), any(ChannelPromise.class)); }
/** * Although nobody is listening to an exception should it occur during cancel(), we don't want an * exception to be thrown because it would negatively impact performance, and we don't want our * users working around around such performance issues. */ @Test public void cancelTwiceShouldSucceed() throws Exception { createStream(); cancelStream(Status.CANCELLED); verifyWrite().writeRstStream(any(ChannelHandlerContext.class), eq(3), eq(Http2Error.CANCEL.code()), any(ChannelPromise.class)); ChannelFuture future = cancelStream(Status.CANCELLED); assertTrue(future.isSuccess()); }
@Test public void cancelTwiceDifferentReasons() throws Exception { createStream(); cancelStream(Status.DEADLINE_EXCEEDED); verifyWrite().writeRstStream(eq(ctx()), eq(3), eq(Http2Error.CANCEL.code()), any(ChannelPromise.class)); ChannelFuture future = cancelStream(Status.CANCELLED); assertTrue(future.isSuccess()); }
private IllegalStateException newIllegalStateException(String msg) { final IllegalStateException cause = new IllegalStateException(msg); failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR); return cause; }
private static String errorStr(long errorCode) { final Http2Error error = Http2Error.valueOf(errorCode); return error != null ? error.toString() + '(' + errorCode + ')' : "UNKNOWN(" + errorCode + ')'; }
protected abstract ChannelFuture doWriteReset( ChannelHandlerContext ctx, int id, int streamId, Http2Error error);
@Test public void maxFrameSize() throws Exception { try (ServerSocket ss = new ServerSocket(0)) { final int port = ss.getLocalPort(); final ClientFactory clientFactory = new ClientFactoryBuilder() .useHttp2Preface(true) .http2MaxFrameSize(DEFAULT_MAX_FRAME_SIZE * 2) // == 16384 * 2 .build(); final HttpClient client = HttpClient.of(clientFactory, "" + port); final CompletableFuture<AggregatedHttpMessage> future = client.get("/").aggregate(); try (Socket s = ss.accept()) { final InputStream in = s.getInputStream(); final BufferedOutputStream bos = new BufferedOutputStream(s.getOutputStream()); readBytes(in, connectionPrefaceBuf().capacity()); // Read the connection preface and discard it. // Read a SETTINGS frame and validate it. assertSettingsFrameOfMaxFrameSize(in); sendEmptySettingsAndAckFrame(bos); readBytes(in, 9); // Read a SETTINGS_ACK frame and discard it. readHeadersFrame(in); // Read a HEADERS frame and discard it. sendHeaderFrame(bos); //////////////////////////////////////// // Transmission of data gets started. // //////////////////////////////////////// // Send a DATA frame that indicates sending data as much as 0x8000 for stream id 03. bos.write(new byte[] { 0x00, (byte) 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03 }); bos.write(EMPTY_DATA); bos.write(EMPTY_DATA); bos.flush(); readBytes(in, 13); // Read a WINDOW_UPDATE frame for connection and discard it. readBytes(in, 13); // Read a WINDOW_UPDATE frame for stream id 03 and discard it. // Send a DATA frame that exceed MAX_FRAME_SIZE by 1. bos.write(new byte[] { 0x00, (byte) 0x80, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03 }); bos.flush(); // Triggers the client to send a GOAWAY frame for the connection. // The client send a GOAWAY frame and the server read it. final ByteBuf buffer = readGoAwayFrame(in); final DefaultHttp2FrameReader frameReader = new DefaultHttp2FrameReader(); final CountDownLatch latch = new CountDownLatch(1); frameReader.readFrame(null, buffer, new Http2EventAdapter() { @Override public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) throws Http2Exception { assertThat(lastStreamId).isZero(); // 0: connection error assertThat(errorCode).isEqualTo(Http2Error.FRAME_SIZE_ERROR.code()); latch.countDown(); } }); latch.await(); buffer.release(); } } }