/** * Setups the test environment. * * @throws Exception if any error occurs */ @Before public void setUp() throws Exception { instance = new HealthCheckTaskExecutor(); server = RxNetty.createHttpServer(PORT, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { if ("/health".equals(request.getPath())) { return response.writeStringAndFlush("{\"status\": \"UP\", \"service\": {\"status\": \"UP\"}}"); } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).start(); }
@Setup public void setup() { server = RxNetty.createHttpServer(SERVER_PORT, new RequestHandler<ByteBuf, ByteBuf>() { public rx.Observable handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { return response.flush(); } }); server.start(); client = new OkHttpClient(); client.setRetryOnConnectionFailure(false); okFeign = Feign.builder() .client(new feign.okhttp.OkHttpClient(client)) .target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT); queryRequest = new Request.Builder() .url("http://localhost:" + SERVER_PORT + "/?Action=GetUser&Version=2010-05-08&limit=1") .build(); }
public HttpServer<ByteBuf, ByteBuf> createServer() { try { final String ideBasePath = new File(".").getCanonicalPath(); final File rootDirectory; if (ideBasePath.contains(rootDir)) { // for eclipse rootDirectory = new File("."); } else { // for intellij rootDirectory = new File(rootDir); } return RxNetty.createHttpServer(port, RequestHandlerWithErrorMapper.from( new LocalDirectoryRequestHandler(rootDirectory), new FileErrorResponseMapper())); } catch (IOException e) { throw new RuntimeException(e); } }
/** * The Mesos HTTP Scheduler API will send a redirect to a client if it is not the leader. The client that is * constructed during {@link #openStream} is bound to a specific host and port, due to this behavior * we "probe" Mesos to try and find out where it's "master" is before we configure the client. * * This method will attempt to send a simple GET to {@code mesosUri}, however instead of going to the path * specified, it will go to {@code /redirect} and construct a uri relative to mesosUri using the host and port * returned in the Location header of the response. */ @NotNull private static URI resolveMesosUri(final @NotNull URI mesosUri) { final String redirectUri = createRedirectUri(mesosUri); LOGGER.info("Probing Mesos server at {}", redirectUri); // When sending an individual request (rather than creating a client) RxNetty WILL follow redirects, // so here we tell it not to do that for this request by creating the config object below. final HttpClient.HttpClientConfig config = HttpClient.HttpClientConfig.Builder.fromDefaultConfig() .setFollowRedirect(false) .build(); final HttpClientResponse<ByteBuf> redirectResponse = RxNetty.createHttpRequest(HttpClientRequest.createGet(redirectUri), config) .toBlocking() .first(); return getUriFromRedirectResponse(mesosUri, redirectResponse); }
@Test public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable { final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); return response.close(); }; final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler); server.start(); final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort())); final MesosClient<String, String> client = createClient(uri); try { client.openStream().await(); fail("Expect an exception to be propagated up because subscribe will 500"); } catch (Mesos5xxException e) { // expected final MesosClientErrorContext ctx = e.getContext(); assertThat(ctx.getStatusCode()).isEqualTo(500); } finally { server.shutdown(); } }
@Test public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable { final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> { response.setStatus(HttpResponseStatus.OK); response.getHeaders().setHeader("Content-Type", "application/json"); return response.close(); }; final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler); server.start(); final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort())); final MesosClient<String, String> client = createClient(uri); try { client.openStream().await(); fail("Expect an exception to be propagated up because of content type mismatch"); } catch (MesosException e) { // expected final MesosClientErrorContext ctx = e.getContext(); assertThat(ctx.getStatusCode()).isEqualTo(200); assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\""); } finally { server.shutdown(); } }
@Test public void server400_nonPost() throws Exception { final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort)); final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort()) .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) .build(); final HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(uri.getPath()) .withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType()); final HttpClientResponse<ByteBuf> response = httpClient.submit(request) .toBlocking() .last(); assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST); final HttpResponseHeaders headers = response.getHeaders(); assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType()); assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0); }
@Test public void server400_invalidContentType() throws Exception { final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort)); final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort()) .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) .build(); final byte[] data = StringMessageCodec.UTF8_STRING.encode("decline"); final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath()) .withHeader("Content-Type", "application/octet-stream") .withHeader("Accept", "application/octet-stream") .withContent(data); final HttpClientResponse<ByteBuf> response = httpClient.submit(request) .toBlocking() .last(); assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST); final HttpResponseHeaders headers = response.getHeaders(); assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType()); assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0); }
@Test public void server400_emptyBody() throws Exception { final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort)); final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort()) .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) .build(); final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath()) .withHeader("Content-Type", StringMessageCodec.UTF8_STRING.mediaType()) .withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType()) .withContent(new byte[]{}); final HttpClientResponse<ByteBuf> response = httpClient.submit(request) .toBlocking() .last(); assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST); final HttpResponseHeaders headers = response.getHeaders(); assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType()); assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0); }
@NotNull private static HttpClientResponse<ByteBuf> sendCall(final URI uri, final String call) { final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort()) .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) .build(); final byte[] data = call.getBytes(StandardCharsets.UTF_8); final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath()) .withHeader("Content-Type", StringMessageCodec.UTF8_STRING.mediaType()) .withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType()) .withContent(data); return httpClient.submit(request) .toBlocking() .last(); }
public HttpServer<ByteBuf, ByteBuf> createServer() { HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { if (request.getPath().contains("/v2/apps")) { if (request.getHttpMethod().equals(HttpMethod.POST)) { return handleTest(request, response); } } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build(); System.out.println("RxTetstServer server started..."); return server; }
public HttpClient(Collection<String> nodes) { // searchShard // search template List<io.reactivex.netty.protocol.http.client.HttpClient<ByteBuf, ByteBuf>> clientsTemp = new ArrayList<>(); // expect something like "http://%s:%d" for (String node : nodes) { String[] next = node.split(":"); // indices admin String host = next[1].substring(2); // remove the // of http:// int port = Integer.parseInt(next[2]); HttpClientBuilder<ByteBuf, ByteBuf> clientBuilder = RxNetty.newHttpClientBuilder(host, port); clientBuilder.config(new RxClient.ClientConfig.Builder().readTimeout(timeOut, MILLISECONDS).build()); clientBuilder.withMaxConnections(maxConnections); clientsTemp.add(clientBuilder.build()); logger.info("adding host {}:{}", host, port); } this.clients = new SnapshotableCopyOnWriteArray<>(clientsTemp); clientSupplier = new RoundRobinSupplier<>(clients); this.httpAdminClient = new HttpAdminClient(clientSupplier); }
@Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); server = RxNetty.createTcpServer(PORT, PipelineConfigurators.textOnlyConfigurator(), connection -> { mainHandler.post(() -> adapter.add("New client connection established.")); connection.writeAndFlush("Welcome! \n\n"); return connection.getInput().flatMap(msg -> { Log.d(TAG, "Server onNext: " + msg); msg = msg.trim(); if (!msg.isEmpty()) { return connection.writeAndFlush("echo => " + msg + '\n'); } else { return Observable.empty(); } }); }); }
@Override protected void onInit() throws Exception { if (cfg.isRxEnableNativeLinuxEpoll()) { System.out.println("Enabling Native Linux Transport..."); RxNetty.useNativeTransportIfApplicable(); } RxNetty.useMetricListenersFactory(new ServoEventsListenerFactory()); Builder builder = new AsyncDocumentClient.Builder() .withServiceEndpoint(cfg.getServiceEndpoint()) .withMasterKey(cfg.getMasterKey()) .withConnectionPolicy(cfg.getConnectionPolicy()) .withConsistencyLevel(cfg.getConsistencyLevel()); this.rxDocumentClient = builder.build(); }
@Override public void init(WundergroundDeviceConfiguration configuration) { this.configuration = configuration; // init HTTP client httpClient = RxNetty.createHttpClient(configuration.getHost(), configuration.getPort()); // configure Json parser mapper = new ObjectMapper(); mapper.getFactory().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); WundergroundTemperatureSensor temperatureSensor = new WundergroundTemperatureSensor(configuration, httpClient, mapper); WundergroundPressureSensor pressureSensor = new WundergroundPressureSensor(configuration, httpClient, mapper); addSensors(temperatureSensor, pressureSensor); }
public static HttpServer<ByteBuf, ByteBuf> startServer(int port) { /** * Creates an HTTP server which returns "Hello World!" responses. */ return RxNetty.createHttpServer(port, /* * HTTP Request handler for RxNetty where you control what you write as the * response for each and every request the server receives. */ (request, response) -> { /** * In a real server, you would be writing different responses based on the * URI of the request. * This example just returns a "Hello World!!" string unconditionally. */ return response.writeStringAndFlush("Hello World!!"); }) .start(); }
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) { String host = "localhost"; int port = DEFAULT_APPLICATION_PORT; String path = "/healthcheck"; try { URL url = new URL(externalHealthCheckURL); host = url.getHost(); port = url.getPort(); path = url.getPath(); } catch (MalformedURLException e) { //continue } Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT); HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator()) .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout) .build(); return httpClient.submit(HttpClientRequest.createGet(path)); }
@Override protected RxClient<I, O> createRxClient(Server server) { ClientBuilder<I, O> builder = RxNetty.newTcpClientBuilder(server.getHost(), server.getPort()); if (pipelineConfigurator != null) { builder.pipelineConfigurator(pipelineConfigurator); } Integer connectTimeout = getProperty(IClientConfigKey.Keys.ConnectTimeout, null, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT); builder.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); if (isPoolEnabled()) { builder.withConnectionPoolLimitStrategy(poolStrategy) .withIdleConnectionsTimeoutMillis(idleConnectionEvictionMills) .withPoolIdleCleanupScheduler(poolCleanerScheduler); } else { builder.withNoConnectionPooling(); } RxClient<I, O> client = builder.build(); return client; }
public UdpServer<DatagramPacket, DatagramPacket> createServer() { UdpServer<DatagramPacket, DatagramPacket> server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() { @Override public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) { return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() { @Override public Observable<Void> call(final DatagramPacket received) { return Observable.interval(delay, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() { @Override public Observable<Void> call(Long aLong) { InetSocketAddress sender = received.sender(); System.out.println("Received datagram. Sender: " + sender); ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length); data.writeBytes(WELCOME_MSG_BYTES); return newConnection.writeAndFlush(new DatagramPacket(data, sender)); } }); } }); } }); System.out.println("UDP hello server started at port: " + port); return server; }
public HttpServer<ByteBuf, ByteBuf> createServer() { HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { if (request.getPath().contains("/users")) { if (request.getHttpMethod().equals(HttpMethod.GET)) { return handleRecommendationsByUserId(request, response); } else { return handleUpdateRecommendationsForUser(request, response); } } if (request.getPath().contains("/recommendations")) { return handleRecommendationsBy(request, response); } if (request.getPath().contains("/movies")) { return handleRegisterMovie(request, response); } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build(); System.out.println("RxMovie server started..."); return server; }
public static void main(String[] args) { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() { @Override public Observable<?> call(ObservableConnection<String, String> connection) { return connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { return msg.trim(); } }); } }).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext event => " + o); } }); }
public static void main(final String[] args) { final int port = 8181; RxNetty.createTcpServer(port, PipelineConfigurators.textOnlyConfigurator(), new ConnectionHandler<String, String>() { @Override public Observable<Void> handle( final ObservableConnection<String, String> connection) { System.out.println("New client connection established."); connection.writeAndFlush("Welcome! \n\n"); return connection.getInput().flatMap(new Func1<String, Observable<Void>>() { @Override public Observable<Void> call(String msg) { System.out.println("onNext: " + msg); msg = msg.trim(); if (!msg.isEmpty()) { return connection.writeAndFlush("echo => " + msg + '\n'); } else { return COMPLETED_OBSERVABLE; } } }); } }).startAndWait(); }
public static void main(String[] args) { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() { @Override public Observable<?> call(ObservableConnection<String, String> connection) { return connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return msg.trim(); } }); } }).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext event => " + o); } }); }
public static void main(final String[] args) { final int port = 8080; RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpRequest<ByteBuf> request, final HttpResponse<ByteBuf> response) { System.out.println("New request recieved"); System.out.println(request.getHttpMethod() + " " + request.getUri() + ' ' + request.getHttpVersion()); for (Map.Entry<String, String> header : request.getHeaders().entries()) { System.out.println(header.getKey() + ": " + header.getValue()); } // This does not consume request content, need to figure out an elegant/correct way of doing that. return response.writeStringAndFlush("Welcome!!! \n\n"); } }).startAndWait(); }
@Test public void testNonChunkingStream() throws Exception { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>sseClientConfigurator()); Observable<HttpClientResponse<ServerSentEvent>> response = client.submit(HttpClientRequest.createGet("test/nochunk_stream")); final List<String> result = new ArrayList<String>(); response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() { @Override public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> httpResponse) { return httpResponse.getContent(); } }).toBlockingObservable().forEach(new Action1<ServerSentEvent>() { @Override public void call(ServerSentEvent event) { result.add(event.getEventData()); } }); assertEquals(RequestProcessor.smallStreamContent, result); }
@Inject public void setInjector(Injector injector) { ServerConfig config = injector.getInstance(serverConfigKey); ConnectionHandler<I, O> connectionHandler = injector.getInstance(connectionHandlerKey); ServerBuilder<I, O> builder = RxNetty.newTcpServerBuilder(config.getPort(), connectionHandler); if (injector.getExistingBinding(pipelineConfiguratorKey) != null) { builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey)); } if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) { builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey)); } server = builder.build().start(); logger.info("Starting server {} on port {}...", nameAnnotation.value(), server.getServerPort()); }
@Inject @SuppressWarnings("unchecked") public void setInjector(Injector injector) { KaryonWebSocketsModule.WebSocketsServerConfig config = (KaryonWebSocketsModule.WebSocketsServerConfig) injector.getInstance(serverConfigKey); ConnectionHandler<I, O> connectionHandler = injector.getInstance(connectionHandlerKey); WebSocketServerBuilder<I, O> builder = RxNetty.newWebSocketServerBuilder(config.getPort(), connectionHandler) .withMessageAggregator(config.isMessageAggregator()); if (injector.getExistingBinding(pipelineConfiguratorKey) != null) { builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey)); } if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) { builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey)); } server = builder.build().start(); logger.info("Starting WebSockets server {} on port {}...", nameAnnotation.value(), server.getServerPort()); }
@Test public void testGovernatedTcpServer() throws Exception { String message = RxNetty.createTcpClient("localhost", server.getServerPort()).connect() .flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<String>>() { @Override public Observable<String> call(ObservableConnection<ByteBuf, ByteBuf> connection) { return connection.getInput().map(new Func1<ByteBuf, String>() { @Override public String call(ByteBuf byteBuf) { return byteBuf.toString(Charset.defaultCharset()); } }); } }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS); assertEquals("Invalid message received from server", SERVER_MESSAGE, message); }
@Test public void testGovernatedTcpServer() throws Exception { String message = RxNetty.<TextWebSocketFrame, TextWebSocketFrame>newWebSocketClientBuilder("localhost", server.getServerPort()) .build() .connect() .flatMap(new Func1<ObservableConnection<TextWebSocketFrame, TextWebSocketFrame>, Observable<String>>() { @Override public Observable<String> call(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) { return connection.getInput().map(new Func1<TextWebSocketFrame, String>() { @Override public String call(TextWebSocketFrame frame) { return frame.text(); } }); } }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS); assertEquals("Invalid message received from server", SERVER_MESSAGE, message); }
public static void main(String[] args) { int port = 8989; if (args.length > 0) { port = Integer.parseInt(args[0]); } System.out.println("Starting mock service on port " + port + "..."); startMonitoring(); RxNetty.<ByteBuf, ByteBuf>newHttpServerBuilder(port, (request, response) -> { try { long startTime = System.currentTimeMillis(); counter.increment(CounterEvent.REQUESTS); return handleRequest(request, response) .doOnCompleted(() -> { counter.increment(CounterEvent.SUCCESS); latency.addValue((int)(System.currentTimeMillis() - startTime)); }) .doOnError(t -> counter.increment(CounterEvent.NETTY_ERROR)); } catch (Throwable e) { System.err.println("Server => Error [" + request.getPath() + "] => " + e); response.setStatus(HttpResponseStatus.BAD_REQUEST); counter.increment(CounterEvent.HTTP_ERROR); return response.writeStringAndFlush("Error 500: Bad Request\n" + e.getMessage() + '\n'); } }).build().startAndWait(); }
@Override protected void onInit() throws Exception { RxNetty.useMetricListenersFactory(new ServoEventsListenerFactory()); documentClient = new DocumentClient(cfg.getServiceEndpoint(), cfg.getMasterKey(), cfg.getConnectionPolicy(), cfg.getConsistencyLevel()); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(cfg.getConcurrency(), cfg.getConcurrency(), 10, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(cfg.getConcurrency(), true), new ThreadPoolExecutor.CallerRunsPolicy()); this.executor = MoreExecutors.listeningDecorator(threadPoolExecutor); }
private HttpClientBuilder<ByteBuf, ByteBuf> httpClientBuilder() { HttpClientBuilder<ByteBuf, ByteBuf> builder = RxNetty .<ByteBuf, ByteBuf>newHttpClientBuilder(this.serviceEndpoint.getHost(), this.serviceEndpoint.getPort()) .withSslEngineFactory(DefaultFactories.trustAll()).withMaxConnections(connectionPolicy.getMaxPoolSize()) .withIdleConnectionsTimeoutMillis(this.connectionPolicy.getIdleConnectionTimeout() * 1000); ClientConfig config = new ClientConfig.Builder() .readTimeout(connectionPolicy.getRequestTimeout(), TimeUnit.SECONDS).build(); return builder.config(config); }
public void start(){ this.injector = Guice.createInjector(modules); this.adapter = injector.getInstance(RequrestAdapter.class); this.adapter.setInjector(injector); server = RxNetty.createHttpServer(port, (req, resp) -> adapter.handle(req, resp) ); server.startAndWait(); }
private Observable<String> initializeStream() { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator()); return client.submit(HttpClientRequest.createGet("/hello")). flatMap(response -> { printResponseHeader(response); return response.getContent(); }).map(serverSentEvent -> serverSentEvent.contentAsString()); }
public HttpServer<ByteBuf, ByteBuf> createServer() { return RxNetty.createHttpServer(port, (request, response) -> { HttpRequest httpRequest = new HttpRequest(request.getQueryParameters()); String content = getResponseContent(httpRequest); return response.writeStringAndFlush(content); }); }
@Override public Observable<String> request(String parameter) { return RxNetty.createHttpGet("http://localhost:" + port + "?" + paramName + "=" + parameter) .flatMap(response -> response.getContent() .<String> map(content -> content.toString(Charset.defaultCharset())) ); }
public HttpServer<ByteBuf, ServerSentEvent> createServer() { HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port, (request, response) -> { response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); response.getHeaders().set(CACHE_CONTROL, "no-cache"); response.getHeaders().set(CONNECTION, "keep-alive"); response.getHeaders().set(CONTENT_TYPE, "text/event-stream"); return getIntervalObservable(request, response); }, PipelineConfigurators.<ByteBuf>serveSseConfigurator()); System.out.println("HTTP Server Sent Events server started..."); return server; }