private void uploadsShouldWork(boolean casUpload, EmbeddedChannel ch, HttpResponseStatus status) throws Exception { ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5}); ChannelPromise writePromise = ch.newPromise(); ch.writeOneOutbound(new UploadCommand(CACHE_URI, casUpload, "abcdef", data, 5), writePromise); HttpRequest request = ch.readOutbound(); assertThat(request.method()).isEqualTo(HttpMethod.PUT); assertThat(request.headers().get(HttpHeaders.CONNECTION)) .isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString()); HttpChunkedInput content = ch.readOutbound(); assertThat(content.readChunk(ByteBufAllocator.DEFAULT).content().readableBytes()).isEqualTo(5); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); ch.writeInbound(response); assertThat(writePromise.isDone()).isTrue(); assertThat(ch.isOpen()).isTrue(); }
/** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */ @Test public void httpErrorsAreSupported() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null)); ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5}); ChannelPromise writePromise = ch.newPromise(); ch.writeOneOutbound(new UploadCommand(CACHE_URI, true, "abcdef", data, 5), writePromise); HttpRequest request = ch.readOutbound(); assertThat(request).isInstanceOf(HttpRequest.class); HttpChunkedInput content = ch.readOutbound(); assertThat(content).isInstanceOf(HttpChunkedInput.class); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN); response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.CLOSE); ch.writeInbound(response); assertThat(writePromise.isDone()).isTrue(); assertThat(writePromise.cause()).isInstanceOf(HttpException.class); assertThat(((HttpException) writePromise.cause()).status()) .isEqualTo(HttpResponseStatus.FORBIDDEN); assertThat(ch.isOpen()).isFalse(); }
protected void writeResponseBody(Representation responseEntity) throws IOException { try { // Send the entity to the client InputStream is = responseEntity.getStream(); getNettyChannel().write(new HttpChunkedInput(new ChunkedStream(is))); getNettyChannel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } catch (IOException ioe) { // The stream was probably already closed by the // connector. Probably OK, low message priority. getLogger().debug("Exception while writing the entity stream.", ioe); } }
@Override public ChunkedInput<HttpContent> chunkFile(FileChannel fileChannel) { try { //TODO tune the chunk size return new HttpChunkedInput(new ChunkedNioFile(fileChannel, 1024)); } catch (IOException e) { throw Exceptions.propagate(e); } }
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { checkState(userPromise == null, "handler can't be shared between pipelines."); userPromise = promise; if (!(msg instanceof UploadCommand)) { failAndResetUserPromise( new IllegalArgumentException( "Unsupported message type: " + StringUtil.simpleClassName(msg))); return; } HttpRequest request = buildRequest((UploadCommand) msg); addCredentialHeaders(request, ((UploadCommand) msg).uri()); HttpChunkedInput body = buildBody((UploadCommand) msg); ctx.writeAndFlush(request) .addListener( (f) -> { if (f.isSuccess()) { return; } body.close(); failAndResetUserPromise(f.cause()); }); ctx.writeAndFlush(body) .addListener( (f) -> { if (f.isSuccess()) { return; } body.close(); failAndResetUserPromise(f.cause()); }); }
@Override public void execute(WebContext ctx) throws Exception { Response resp = ctx.getResponse(); resp.setHeader("Content-Type", this.mime); resp.setDateHeader("Date", System.currentTimeMillis()); resp.setDateHeader("Last-Modified", this.file.getWhen()); resp.setHeader("X-UA-Compatible", "IE=Edge,chrome=1"); if (ctx.getRequest().hasHeader("If-Modified-Since")) { long dd = this.file.getWhen() - ctx.getRequest().getDateHeader("If-Modified-Since"); // getDate does not return consistent results because milliseconds // are not cleared correctly see: // https://sourceforge.net/tracker/index.php?func=detail&aid=3162870&group_id=62369&atid=500353 // so ignore differences of less than 1000, they are false positives if (dd < 1000) { resp.setStatus(HttpResponseStatus.NOT_MODIFIED); ctx.send(); return; } } if (!ctx.getSite().getMimeCompress(this.mime)) resp.setHeader(HttpHeaders.Names.CONTENT_ENCODING, HttpHeaders.Values.IDENTITY); ctx.sendStart(0); // TODO send from memory cache if small enough try { ctx.send(new HttpChunkedInput(new ChunkedNioFile(this.file.getFilePath().toFile()))); // TODO not ideal, cleanup so direct reference to path is not needed } catch (IOException x) { // TODO improve support } ctx.sendEnd(); }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception { if (fullHttpRequest == null) { return; } log.info("Request [{}]", fullHttpRequest.uri()); Request request = new Request(channelHandlerContext.channel(), fullHttpRequest); WebContext.setCurrentRequest(request); Response preparedResponse = new Response(request); WebContext.setCurrentResponse(preparedResponse); Route route = null; try { route = RouteFinder.findRoute(request); if (route == null) { HttpResponse exceptionResponse = Response.buildDefaultFullHttpResponse(HttpResponseStatus.NOT_FOUND); channelHandlerContext.write(exceptionResponse); return; } RouteSetter.routeSetter(route, fullHttpRequest); } catch (Exception e) { handleException(channelHandlerContext, e); return; } Boolean continueProcess = true; //before aop if (route.beforeProxyChain().size() != 0) { continueProcess = route.beforeProxyChain().doChain(request, preparedResponse, route); } if (continueProcess) { Object o = route.getMethod().invoke(route.getObject(), route.getParamters()); //after aop if (route.afterProxyChain().size() != 0) { route.afterProxyChain().doChain(request, preparedResponse, route); } if (o instanceof Response) { preparedResponse = Response.mergeResponse(preparedResponse, (Response) o); WebContext.setCurrentResponse(preparedResponse); } else if (route.view()) { preparedResponse.setFile(resolver.resolve(o.toString(), preparedResponse.getModel())); preparedResponse.setResponseStatus(HttpResponseStatus.OK); preparedResponse.header(HttpHeader.CONTENT_TYPE, "text/html"); } else { preparedResponse.setBody(o); preparedResponse.setResponseStatus(HttpResponseStatus.OK); } log.info("Response {{}}", preparedResponse.body()); } HttpResponse response = Response.buildDefaultFullHttpResponse0(); channelHandlerContext.write(response); if (WebContext.currentResponse().file() != null) { channelHandlerContext.write(new HttpChunkedInput(new ChunkedFile(WebContext.currentResponse().file()))); } }
private HttpChunkedInput buildBody(UploadCommand msg) { return new HttpChunkedInput(new ChunkedStream(msg.data())); }
@Override public void send(final FileChannel channel, final long offset, final long count) throws Exception { DefaultHttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status); headers.remove(HttpHeaderNames.TRANSFER_ENCODING); headers.set(HttpHeaderNames.CONTENT_LENGTH, count); if (keepAlive) { headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); } // dump headers rsp.headers().set(headers); ChannelHandlerContext ctx = this.ctx; ctx.channel().attr(NettyRequest.NEED_FLUSH).set(false); ChannelPipeline pipeline = ctx.pipeline(); boolean ssl = pipeline.get(SslHandler.class) != null; if (ssl) { // add chunker chunkHandler(pipeline); // Create the chunked input here already, to properly handle the IOException HttpChunkedInput chunkedInput = new HttpChunkedInput( new ChunkedNioFile(channel, offset, count, bufferSize)); ctx.channel().eventLoop().execute(() -> { // send headers ctx.write(rsp, ctx.voidPromise()); // chunked file if (keepAlive) { ctx.writeAndFlush(chunkedInput, ctx.voidPromise()); } else { ctx.writeAndFlush(chunkedInput).addListener(CLOSE); } }); } else { ctx.channel().eventLoop().execute(() -> { // send headers ctx.write(rsp, ctx.voidPromise()); // file region ctx.write(new DefaultFileRegion(channel, offset, count), ctx.voidPromise()); if (keepAlive) { ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.voidPromise()); } else { ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(CLOSE); } }); } committed = true; }