private ChannelFuture validateStream(ChannelHandlerContext ctx, int streamId) { final Http2Stream stream = encoder.connection().stream(streamId); if (stream != null) { switch (stream.state()) { case RESERVED_LOCAL: case OPEN: case HALF_CLOSED_REMOTE: break; default: // The response has been sent already. return ctx.newFailedFuture(ClosedPublisherException.get()); } } else if (encoder.connection().streamMayHaveExisted(streamId)) { // Stream has been removed because it has been closed completely. return ctx.newFailedFuture(ClosedPublisherException.get()); } return null; }
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise) throws Exception { close(ctx, promise); connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyServerStream.TransportState serverStream = serverStream(stream); if (serverStream != null) { serverStream.transportReportStatus(msg.getStatus()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); } stream.close(); return true; } }); }
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise) throws Exception { // close() already called by NettyClientTransport, so just need to clean up streams connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); if (clientStream != null) { clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); } stream.close(); return true; } }); }
/** * Handler for a GOAWAY being either sent or received. Fails any streams created after the * last known stream. */ private void goingAway(Status status) { lifecycleManager.notifyShutdown(status); final Status goAwayStatus = lifecycleManager.getShutdownStatus(); final int lastKnownStream = connection().local().lastStreamKnownByPeer(); try { connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { if (stream.id() > lastKnownStream) { NettyClientStream.TransportState clientStream = clientStream(stream); if (clientStream != null) { clientStream.transportReportStatus(goAwayStatus, false, new Metadata()); } stream.close(); } return true; } }); } catch (Http2Exception e) { throw new RuntimeException(e); } }
@Test public void windowShouldNotExceedMaxWindowSize() throws Exception { manualSetUp(); makeStream(); AbstractNettyHandler handler = (AbstractNettyHandler) handler(); handler.setAutoTuneFlowControl(true); Http2Stream connectionStream = connection().connectionStream(); Http2LocalFlowController localFlowController = connection().local().flowController(); int maxWindow = handler.flowControlPing().maxWindow(); handler.flowControlPing().setDataSizeSincePing(maxWindow); int payload = handler.flowControlPing().payload(); ByteBuf buffer = handler.ctx().alloc().buffer(8); buffer.writeLong(payload); channelRead(pingFrame(true, buffer)); assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream)); }
@Override public void onStreamClosed(Http2Stream stream) { final HttpResponseWrapper res = getResponse(streamIdToId(stream.id()), true); if (res != null) { res.close(ClosedSessionException.get()); } }
/** * Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the * context of the transport thread. */ public void setHttp2Stream(Http2Stream http2Stream) { checkNotNull(http2Stream, "http2Stream"); checkState(this.http2Stream == null, "Can only set http2Stream once"); this.http2Stream = http2Stream; // Now that the stream has actually been initialized, call the listener's onReady callback if // appropriate. onStreamAllocated(); }
public TransportState( NettyServerHandler handler, EventLoop eventLoop, Http2Stream http2Stream, int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) { super(maxMessageSize, statsTraceCtx, transportTracer); this.http2Stream = checkNotNull(http2Stream, "http2Stream"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = eventLoop; }
/** * Handler for the Channel shutting down. */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); } if (maxConnectionIdleManager != null) { maxConnectionIdleManager.onTransportTermination(); } if (maxConnectionAgeMonitor != null) { maxConnectionAgeMonitor.cancel(false); } final Status status = Status.UNAVAILABLE.withDescription("connection terminated for unknown reason"); // Any streams that are still active must be closed connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyServerStream.TransportState serverStream = serverStream(stream); if (serverStream != null) { serverStream.transportReportStatus(status); } return true; } }); } finally { super.channelInactive(ctx); } }
/** * Returns the given processed bytes back to inbound flow control. */ void returnProcessedBytes(Http2Stream http2Stream, int bytes) { try { decoder().flowController().consumeBytes(http2Stream, bytes); } catch (Http2Exception e) { throw new RuntimeException(e); } }
private Http2Stream requireHttp2Stream(int streamId) { Http2Stream stream = connection().stream(streamId); if (stream == null) { // This should never happen. throw new AssertionError("Stream does not exist: " + streamId); } return stream; }
/** * Returns the given processed bytes back to inbound flow control. */ void returnProcessedBytes(Http2Stream stream, int bytes) { try { decoder().flowController().consumeBytes(stream, bytes); } catch (Http2Exception e) { throw new RuntimeException(e); } }
/** * Handler for the Channel shutting down. */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { logger.fine("Network channel is closed"); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); lifecycleManager.notifyShutdown(status); try { cancelPing(lifecycleManager.getShutdownThrowable()); // Report status to the application layer for any open streams connection().forEachActiveStream(new Http2StreamVisitor() { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); if (clientStream != null) { clientStream.transportReportStatus( lifecycleManager.getShutdownStatus(), false, new Metadata()); } return true; } }); } finally { lifecycleManager.notifyTerminated(status); } } finally { // Close any open streams super.channelInactive(ctx); if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); } } }
/** * Sends initial connection window to the remote endpoint if necessary. */ private void sendInitialConnectionWindow() throws Http2Exception { if (ctx.channel().isActive() && initialConnectionWindow > 0) { Http2Stream connectionStream = connection().connectionStream(); int currentSize = connection().local().flowController().windowSize(connectionStream); int delta = initialConnectionWindow - currentSize; decoder().flowController().incrementWindowSize(connectionStream, delta); initialConnectionWindow = -1; ctx.flush(); } }
@Test public void connectionWindowShouldBeOverridden() throws Exception { flowControlWindow = 1048576; // 1MiB manualSetUp(); Http2Stream connectionStream = connection().connectionStream(); Http2LocalFlowController localFlowController = connection().local().flowController(); int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream); int actualWindowSize = localFlowController.windowSize(connectionStream); assertEquals(flowControlWindow, actualWindowSize); assertEquals(flowControlWindow, actualInitialWindowSize); }
@Test public void connectionWindowShouldBeOverridden() throws Exception { flowControlWindow = 1048576; // 1MiB setUp(); Http2Stream connectionStream = connection().connectionStream(); Http2LocalFlowController localFlowController = connection().local().flowController(); int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream); int actualWindowSize = localFlowController.windowSize(connectionStream); assertEquals(flowControlWindow, actualWindowSize); assertEquals(flowControlWindow, actualInitialWindowSize); assertEquals(1048576, actualWindowSize); }
@Override protected NettyClientHandler newHandler() throws Http2Exception { Http2Connection connection = new DefaultHttp2Connection(false); // Create and close a stream previous to the nextStreamId. Http2Stream stream = connection.local().createStream(streamId - 2, true); stream.close(); final Ticker ticker = new Ticker() { @Override public long read() { return nanoTime; } }; Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() { @Override public Stopwatch get() { return Stopwatch.createUnstarted(ticker); } }; return NettyClientHandler.newHandler( connection, frameReader(), frameWriter(), lifecycleManager, mockKeepAliveManager, flowControlWindow, maxHeaderListSize, stopwatchSupplier, tooManyPingsRunnable, transportTracer); }
@Override public void onStreamAdded(Http2Stream stream) {}
@Override public void onStreamActive(Http2Stream stream) {}
@Override public void onStreamHalfClosed(Http2Stream stream) {}
@Override public void onStreamRemoved(Http2Stream stream) {}
@Override public void onStreamRemoved(Http2Stream stream) { requests.remove(stream.id()); }
@Override public int onDataRead( ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { final DecodedHttpRequest req = requests.get(streamId); if (req == null) { throw connectionError(PROTOCOL_ERROR, "received a DATA Frame for an unknown stream: %d", streamId); } final int dataLength = data.readableBytes(); if (dataLength == 0) { // Received an empty DATA frame if (endOfStream) { req.close(); } return padding; } req.increaseTransferredBytes(dataLength); final long maxContentLength = req.maxRequestLength(); if (maxContentLength > 0 && req.transferredBytes() > maxContentLength) { if (req.isOpen()) { req.close(ContentTooLargeException.get()); } if (isWritable(streamId)) { writeErrorResponse(ctx, streamId, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); } else { // Cannot write to the stream. Just close it. final Http2Stream stream = writer.connection().stream(streamId); stream.close(); } } else if (req.isOpen()) { try { req.write(new ByteBufHttpData(data.retain(), endOfStream)); } catch (Throwable t) { req.close(t); throw connectionError(INTERNAL_ERROR, t, "failed to consume a DATA frame"); } if (endOfStream) { req.close(); } } // All bytes have been processed. return dataLength + padding; }
@Override public void onStreamRemoved(Http2Stream stream) { if (stream.id() == 1) { logger.debug("{} HTTP/2 upgrade stream removed: {}", ch, stream.state()); } }
/** * Gets the underlying Netty {@link Http2Stream} for this stream. */ @Nullable public Http2Stream http2Stream() { return http2Stream; }
private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) throws Http2Exception { if (!teWarningLogged && !TE_TRAILERS.equals(headers.get(TE_HEADER))) { logger.warning(String.format("Expected header TE: %s, but %s is received. This means " + "some intermediate proxy may not support trailers", TE_TRAILERS, headers.get(TE_HEADER))); teWarningLogged = true; } try { // Remove the leading slash of the path and get the fully qualified method name CharSequence path = headers.path(); if (path == null) { respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED, "Expected path but is missing"); return; } if (path.charAt(0) != '/') { respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED, String.format("Expected path to start with /: %s", path)); return; } String method = path.subSequence(1, path.length()).toString(); // Verify that the Content-Type is correct in the request. CharSequence contentType = headers.get(CONTENT_TYPE_HEADER); if (contentType == null) { respondWithHttpError( ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request"); return; } String contentTypeString = contentType.toString(); if (!GrpcUtil.isGrpcContentType(contentTypeString)) { respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL, String.format("Content-Type '%s' is not supported", contentTypeString)); return; } if (!HTTP_METHOD.equals(headers.method())) { respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL, String.format("Method '%s' is not supported", headers.method())); return; } // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this // method. Http2Stream http2Stream = requireHttp2Stream(streamId); Metadata metadata = Utils.convertHeaders(headers); StatsTraceContext statsTraceCtx = StatsTraceContext.newServerContext(streamTracerFactories, method, metadata); NettyServerStream.TransportState state = new NettyServerStream.TransportState( this, ctx.channel().eventLoop(), http2Stream, maxMessageSize, statsTraceCtx, transportTracer); String authority = getOrUpdateAuthority((AsciiString) headers.authority()); NettyServerStream stream = new NettyServerStream( ctx.channel(), state, attributes, authority, statsTraceCtx, transportTracer); transportListener.streamCreated(stream, method, metadata); state.onStreamAllocated(); http2Stream.setProperty(streamKey, state); } catch (Exception e) { logger.log(Level.WARNING, "Exception in onHeadersRead()", e); // Throw an exception that will get handled by onStreamError. throw newStreamException(streamId, e); } }
/** * Returns the server stream associated to the given HTTP/2 stream object. */ private NettyServerStream.TransportState serverStream(Http2Stream stream) { return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey); }
/** * Gets the client stream associated to the given HTTP/2 stream object. */ private NettyClientStream.TransportState clientStream(Http2Stream stream) { return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey); }