@SuppressFBWarnings @Override public ChunkedOutput<String> executeChunkedChain(final OperationChainDAO opChain) { // Create chunked output instance final ChunkedOutput<String> output = new ChunkedOutput<>(String.class, "\r\n"); // write chunks to the chunked output object new Thread(() -> { try { final Object result = _execute(opChain); chunkResult(result, output); } finally { CloseableUtil.close(output); CloseableUtil.close(opChain); } }).start(); return output; }
@SuppressFBWarnings @Override public ChunkedOutput<String> executeChunkedChain(final OperationChain opChain) { // Create chunked output instance final ChunkedOutput<String> output = new ChunkedOutput<>(String.class, "\r\n"); // write chunks to the chunked output object new Thread(() -> { try { final Object result = _execute(opChain).getFirst(); chunkResult(result, output); } finally { CloseableUtil.close(output); CloseableUtil.close(opChain); } }).start(); return output; }
/** * Handles the RESTCONF Event Notification Subscription request. If the * subscription is successful, a ChunkedOutput stream is created and returned * to the caller. * <p> * This function is not blocked on streaming the data (so that it can handle * other incoming requests). Instead, a worker thread running in the background * does the data streaming. If errors occur during streaming, the worker thread * calls ChunkedOutput.close() to disconnect the session and terminates itself. * * @param streamId Event stream ID * @param request RESTCONF client information from which the client IP * address is retrieved * @return A string data stream over HTTP keep-alive session */ @GET @Produces(MediaType.APPLICATION_JSON) @Path("streams/{streamId}") public ChunkedOutput<String> handleNotificationRegistration(@PathParam("streamId") String streamId, @Context HttpServletRequest request) { final ChunkedOutput<String> output = new ChunkedOutput<>(String.class); try { service.subscribeEventStream(streamId, request.getRemoteAddr(), output); } catch (RestconfException e) { log.error("ERROR: handleNotificationRegistration: {}", e.getMessage()); log.debug("Exception in handleNotificationRegistration:", e); try { output.close(); } catch (IOException ex) { log.error("ERROR: handleNotificationRegistration:", ex); } } return output; }
@SuppressFBWarnings @Override public ChunkedOutput<String> executeChunked(final Operation operation) { if (operation instanceof OperationChainDAO) { return executeChunkedChain((OperationChainDAO) operation); } if (operation instanceof OperationChain) { return executeChunkedChain(new OperationChainDAO(((OperationChain) operation).getOperations())); } return executeChunkedChain(new OperationChainDAO(operation)); }
@POST @Path("/execute/chunked") @ApiOperation(value = "Performs the given operation on the graph, returned chunked output. NOTE - does not work in Swagger.", response = Object.class, produces = APPLICATION_JSON) @ApiResponses(value = {@ApiResponse(code = 202, message = OK), @ApiResponse(code = 400, message = BAD_REQUEST), @ApiResponse(code = 403, message = FORBIDDEN), @ApiResponse(code = 500, message = INTERNAL_SERVER_ERROR), @ApiResponse(code = 501, message = OPERATION_NOT_IMPLEMENTED)}) ChunkedOutput<String> executeChunked(final Operation operation);
@Override public <OUT extends ChunkedOutput<OutboundEvent>> boolean add( final OUT chunkedOutput) { if (chunkedOutput.isClosed()) { return false; } final boolean result = super.add(chunkedOutput); if (result) { final int active = connectionCounter.incrementAndGet(); LOGGER.debug("Opened new connection ({} total)", active); } return result; }
@Override public void subscribeEventStream(String streamId, String clientIpAddr, ChunkedOutput<String> output) throws RestconfException { //TODO: to be completed throw new RestconfException("Not implemented", RestconfError.ErrorTag.OPERATION_NOT_SUPPORTED, Response.Status.NOT_IMPLEMENTED, Optional.empty(), Optional.of("subscribeEventStream not yet implemented")); }
@Override public void contextInitialized(ServletContextEvent sce) { // Now initialize shared jedis connection object and thread pool object jedisConn = new JedisConnectionObject(redisHost, redisPort); executorService = Executors.newCachedThreadPool(); writerList = new ArrayList<ChunkedOutput<String>>(); logger.info("Context Initialized, executorService: " + executorService); }
public AsyncStreamRedisSubscriber(final Jedis jedis, final ChunkedOutput<String> responseWriter, ArrayList<ChunkedOutput<String>> writerList, final SubscriptionDataObject subData) throws IOException { this.channel = subData.redisChannel; this.callbackName = subData.callbackName; this.responseWriter = responseWriter; this.writerList = writerList; this.subData = new SubscriptionDataObject(); this.subData.set(subData); this.setRunFlag(true); if (subData.duration != null) { subscriptionDuration = parseTime(subData.duration); } else { subscriptionDuration = SUBSCRIPTION_MAX_DURATION; } //System.out.println("rate=" + subData.rate + ", duration=" + subData.duration + ", callbackName=" + subData.callbackName); logger.info("Client requested subscription for duration = " + subscriptionDuration); if (subData.rate > 0) { messageRate = subData.rate; // specified as messages/min (NOTE: upper-bound) sleepTime = Math.max(0, Math.round(60 * 1000 / messageRate)); // time to sleep between sends (in msecs) } else { sleepTime = DEFAULT_SLEEP_TIME; // use default value } messageList = new ArrayList<String>(DEFAULT_COUNT); }
@Test public void streamChunkedResponseTest() throws IOException { String channelCode = "mock_collection"; String callbackName = null; Float rate = new Float(-1); String duration = "-1"; ChunkedOutput<String> output = asyncStream.streamChunkedResponse(channelCode, callbackName, rate, duration); // Class<?> s = output.getRawType(); assertNotNull(output); assertEquals("class java.lang.String", s.toString()); }
private Resource getResource(Inflector<ContainerRequestContext, ChunkedOutput<?>> inflector) { final Resource.Builder resourceBuilder = Resource.builder(); resourceBuilder.path("/"); final ResourceMethod.Builder methodBuilder = resourceBuilder.addMethod("GET"); methodBuilder.produces(MediaType.TEXT_PLAIN_TYPE) .handledBy(inflector); return resourceBuilder.build(); }
private int bindJerseyServer(Inflector<ContainerRequestContext, ChunkedOutput<?>> inflector, ServerBootstrap bootstrap, Class... classes) throws URISyntaxException { final Resource resource = getResource(inflector); final NettyContainer jerseyHandler = getNettyContainer(resource, classes); setChunkedHttpPipeline(bootstrap, jerseyHandler); final Channel bind = bootstrap.bind(new InetSocketAddress(0)); InetSocketAddress socketAddress = (InetSocketAddress) bind.getLocalAddress(); return socketAddress.getPort(); }
@POST @Consumes({ ExampleMediaTypes.PLAINTEXT_0_1_0, MediaType.TEXT_PLAIN, ExampleMediaTypes.SIMPLE_PROTOBUF_0_1_0, ExampleMediaTypes.SIMPLE_JSON_0_1_0, ExampleMediaTypes.STRUCTURED_JSON_0_1_0 }) @Produces({ PredictionMediaTypes.PLAINTEXT_0_1_0 }) @Path("/main") public ChunkedOutput<String> doPredict(ExamplesIterable examplesIterable) throws IOException { return new RequestHandler(executorService, exampleProcessorFactory).handleRequest(examplesIterable); }
public ChunkedOutput<String> handleRequest(ExamplesIterable examplesIterable) { ChunkedOutput<String> chunkedOutput = new ChunkedOutput<String>(String.class); // get the example processor. ExampleProcessor exampleProcessor = exampleProcessorFactory.getExampleProcessor(examplesIterable); if (exampleProcessor.getExampleProcessorFeatures().isAsync() == false) submitSynchronously(exampleProcessor, chunkedOutput); else { submitAsynchronously(exampleProcessor, chunkedOutput); } return chunkedOutput; }
@POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_OCTET_STREAM) @Path("/rows/available/{ringMember}/{ringHost}/{system}/{ringTimestampId}/{takeSessionId}/{timeoutMillis}") public ChunkedOutput<byte[]> availableRowsStream(@PathParam("ringMember") String ringMemberString, @PathParam("ringHost") String ringHost, @PathParam("system") boolean system, @PathParam("ringTimestampId") long ringTimestampId, @PathParam("takeSessionId") long takeSessionId, @PathParam("timeoutMillis") long timeoutMillis, byte[] sharedKey) { LatchChunkedOutput chunkedOutput = new LatchChunkedOutput(10_000); new Thread(() -> { chunkedOutput.await("availableRowsStream", () -> { amzaStats.availableRowsStream.increment(); try { amzaInstance.availableRowsStream( system, chunkedOutput::write, new RingMember(ringMemberString), new TimestampedRingHost(RingHost.fromCanonicalString(ringHost), ringTimestampId), takeSessionId, objectMapper.readValue(sharedKey, Long.class), timeoutMillis); return null; } finally { amzaStats.availableRowsStream.decrement(); } }); }, "available-" + ringMemberString + "-" + (system ? "system" : "striped")).start(); return chunkedOutput; }
@POST @Path("/chunked/operation") @ApiOperation(value = "Performs the given operation on the graph, returned chunked output. NOTE - does not work in Swagger.", response = Object.class) ChunkedOutput<String> executeChunked(final Operation operation);
@POST @Path("/chunked") @ApiOperation(value = "Performs the given operation chain on the graph, returned chunked output. NOTE - does not work in Swagger.", response = Object.class) ChunkedOutput<String> executeChunkedChain(final OperationChainDAO<CloseableIterable<Element>> opChain);
@SuppressFBWarnings ChunkedOutput<String> executeChunkedChain(final OperationChain opChain);
@Override public ChunkedOutput<String> executeChunked(final Operation operation) { return executeChunkedChain(OperationChain.wrap(operation)); }
@Override public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) { LOGGER.trace("Connection exception", exception); }
@Override public void onClose(final ChunkedOutput<OutboundEvent> chunkedOutput) { final int active = connectionCounter.decrementAndGet(); LOGGER.debug("Closed connection ({} total)", active); }
/** * {@inheritDoc} */ @Override public ChunkedOutput apply(ContainerRequestContext containerRequestContext) { return null; }
@GET public ChunkedOutput<String> throwing() { throw new IllegalStateException("throw"); }
@POST @Consumes(MediaType.TEXT_PLAIN) public ChunkedOutput<String> throwing(String body) { throw new IllegalStateException("throw"); }
@Test public void testEntityChunkedOutput() throws URISyntaxException, IOException, ExecutionException, InterruptedException { Inflector<ContainerRequestContext, ChunkedOutput<?>> inflector = new Inflector<ContainerRequestContext, ChunkedOutput<?>>() { @Override public ChunkedOutput<Entity> apply(ContainerRequestContext containerRequestContext) { final ChunkedOutput<Entity> output = new ChunkedOutput<Entity>(Entity.class); new Thread() { int i = 0; @Override public void run() { try { while (i <= 4) { if (i == 0) { output.write(new Entity(true, 0)); } else { output.write(new Entity(false, 1000)); } i++; } output.close(); } catch (IOException e) { fail("writing should not fail", e); } } }.start(); return output; } }; ServerBootstrap bootstrap = getServerBootstrap(); int port = bindJerseyServer(inflector, bootstrap, EntityWriter.class); final AsyncHttpClient client = getHttpClient(); ListenableFuture<Object> request = client.prepareGet("http://localhost:" + port + "/").execute(new AsyncHandler<Object>() { @Override public void onThrowable(Throwable t) { fail("Should not throw up", t); } @Override public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { return STATE.CONTINUE; } @Override public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception { return STATE.CONTINUE; } @Override public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception { return STATE.CONTINUE; } @Override public Object onCompleted() throws Exception { return STATE.CONTINUE; } }); request.get(); bootstrap.shutdown(); }
public ChunkedOutputFiler(int bufferSize, ChunkedOutput<byte[]> chunkedOutput) { this.bufferSize = bufferSize; this.filer = new HeapFiler(bufferSize); this.chunkedOutput = chunkedOutput; }
private void submitAsynchronously(final ExampleProcessor exampleSubmitter, final ChunkedOutput<String> chunkedOutput) { executorService.submit(new Runnable() { @Override public void run() { submitSynchronously(exampleSubmitter, chunkedOutput); } }); }
/** * Handles an Event Stream subscription request. This function creates * a worker thread to listen to events and writes to a ChunkedOutput, * which is passed in from the caller. (The worker thread blocks if * no events arrive.) The ChuckedOutput is a pipe to which this * function acts as the writer and the caller the reader. * <p> * If the Event Stream cannot be subscribed due to reasons such as * the nonexistence of the target stream or failure to allocate * worker thread to handle the request, a RestconfException exception * is raised. The proper HTTP error status code is enclosed in the * exception, so that the caller may return it to the RESTCONF client * to display. * * @param streamId ID of the RESTCONF stream to subscribe * @param clientIpAddr IP address of the RESTCONF client sending the request * @param output A string data stream * @throws RestconfException if the Event Stream cannot be subscribed */ void subscribeEventStream(String streamId, String clientIpAddr, ChunkedOutput<String> output) throws RestconfException;