public HttpServer<ByteBuf, ByteBuf> createServer() { HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { if (request.getPath().contains("/v2/apps")) { if (request.getHttpMethod().equals(HttpMethod.POST)) { return handleTest(request, response); } } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build(); System.out.println("RxTetstServer server started..."); return server; }
@Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); server = RxNetty.createTcpServer(PORT, PipelineConfigurators.textOnlyConfigurator(), connection -> { mainHandler.post(() -> adapter.add("New client connection established.")); connection.writeAndFlush("Welcome! \n\n"); return connection.getInput().flatMap(msg -> { Log.d(TAG, "Server onNext: " + msg); msg = msg.trim(); if (!msg.isEmpty()) { return connection.writeAndFlush("echo => " + msg + '\n'); } else { return Observable.empty(); } }); }); }
private HttpServer<ByteBuf, ByteBuf> createServer(RestHttpServerConfiguration configuration) { ServerBootstrap serverBootstrap = createServerBootstrap(configuration.getInterface(), configuration.getPort()); HttpServerBuilder<ByteBuf, ByteBuf> httpServerBuilder = new HttpServerBuilder<>(serverBootstrap, configuration.getPort(), createServiceRequestHandler(configuration.getRestServices(), configuration.getConverters())) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()) .withRequestProcessingThreads(configuration.getRequestProcessingThreads()) .enableWireLogging(LogLevel.DEBUG); String message = "starting http server on port %s at interface %s, with %s processing threads ..."; log.info(String.format(message, configuration.getPort(), configuration.getInterface(), configuration.getRequestProcessingThreads())); logServices(configuration.getRestServices()); return httpServerBuilder.build(); }
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) { String host = "localhost"; int port = DEFAULT_APPLICATION_PORT; String path = "/healthcheck"; try { URL url = new URL(externalHealthCheckURL); host = url.getHost(); port = url.getPort(); path = url.getPath(); } catch (MalformedURLException e) { //continue } Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT); HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator()) .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout) .build(); return httpClient.submit(HttpClientRequest.createGet(path)); }
public HttpServer<ByteBuf, ByteBuf> createServer() { HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) { if (request.getPath().contains("/users")) { if (request.getHttpMethod().equals(HttpMethod.GET)) { return handleRecommendationsByUserId(request, response); } else { return handleUpdateRecommendationsForUser(request, response); } } if (request.getPath().contains("/recommendations")) { return handleRecommendationsBy(request, response); } if (request.getPath().contains("/movies")) { return handleRegisterMovie(request, response); } response.setStatus(HttpResponseStatus.NOT_FOUND); return response.close(); } }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build(); System.out.println("RxMovie server started..."); return server; }
public static void main(String[] args) { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() { @Override public Observable<?> call(ObservableConnection<String, String> connection) { return connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { return msg.trim(); } }); } }).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext event => " + o); } }); }
public static void main(final String[] args) { final int port = 8181; RxNetty.createTcpServer(port, PipelineConfigurators.textOnlyConfigurator(), new ConnectionHandler<String, String>() { @Override public Observable<Void> handle( final ObservableConnection<String, String> connection) { System.out.println("New client connection established."); connection.writeAndFlush("Welcome! \n\n"); return connection.getInput().flatMap(new Func1<String, Observable<Void>>() { @Override public Observable<Void> call(String msg) { System.out.println("onNext: " + msg); msg = msg.trim(); if (!msg.isEmpty()) { return connection.writeAndFlush("echo => " + msg + '\n'); } else { return COMPLETED_OBSERVABLE; } } }); } }).startAndWait(); }
public static void main(String[] args) { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() { @Override public Observable<?> call(ObservableConnection<String, String> connection) { return connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return msg.trim(); } }); } }).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext event => " + o); } }); }
@Test public void testNonChunkingStream() throws Exception { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>sseClientConfigurator()); Observable<HttpClientResponse<ServerSentEvent>> response = client.submit(HttpClientRequest.createGet("test/nochunk_stream")); final List<String> result = new ArrayList<String>(); response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() { @Override public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> httpResponse) { return httpResponse.getContent(); } }).toBlockingObservable().forEach(new Action1<ServerSentEvent>() { @Override public void call(ServerSentEvent event) { result.add(event.getEventData()); } }); assertEquals(RequestProcessor.smallStreamContent, result); }
private Observable<String> initializeStream() { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator()); return client.submit(HttpClientRequest.createGet("/hello")). flatMap(response -> { printResponseHeader(response); return response.getContent(); }).map(serverSentEvent -> serverSentEvent.contentAsString()); }
public HttpServer<ByteBuf, ServerSentEvent> createServer() { HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port, (request, response) -> { response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*"); response.getHeaders().set(CACHE_CONTROL, "no-cache"); response.getHeaders().set(CONNECTION, "keep-alive"); response.getHeaders().set(CONTENT_TYPE, "text/event-stream"); return getIntervalObservable(request, response); }, PipelineConfigurators.<ByteBuf>serveSseConfigurator()); System.out.println("HTTP Server Sent Events server started..."); return server; }
/** * Creates a new hystrix reader. * * @param configuration The configuration to use. * @param cluster The cluster to read from. */ public HystrixReader(Configuration configuration, String cluster) { this.configuration = configuration; this.cluster = cluster; HttpClientBuilder<ByteBuf, ServerSentEvent> builder = RxNetty.newHttpClientBuilder(configuration.getTurbineHost(), configuration.getTurbinePort()); builder.pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()); if (configuration.isSecure()) { builder.withSslEngineFactory(DefaultFactories.trustAll()); } rxNetty = builder.build(); }
public Observable<HttpClientResponse<ByteBuf>> postMessage(String message) { PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = PipelineConfigurators.httpClientConfigurator(); HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(networkAddress.getIpAddress(), port) .pipelineConfigurator(pipelineConfigurator) .enableWireLogging(LogLevel.ERROR).build(); HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/v2/apps"); request.withRawContentSource(Observable.just(message), StringTransformer.DEFAULT_INSTANCE); request.withHeader("Content-Type", "application/json"); return client.submit(request); }
private HttpClientResponse<ByteBuf> getResponse(String serviceUrl) throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException { String host, path; int port; URL url = new URL(serviceUrl); host = url.getHost(); port = url.getPort(); path = url.getPath(); System.out.println(url); HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator()) .build(); return httpClient.submit(HttpClientRequest.createGet(path)).toBlocking().toFuture().get(checkTimeout, TimeUnit.MILLISECONDS); }
@Override public Observable<String> observeJson() { if(jsonObservable != null) { return jsonObservable; } HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(url.getPath() + "?" + url.getQuery()); int port = url.getPort() < 0 ? url.getDefaultPort() : url.getPort(); HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(url.getHost(), port) .withNoConnectionPooling() .pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator()) .build(); jsonObservable = client.submit(request) .doOnError(t -> LOG.error("Error connecting to " + url, t)) .flatMap(response -> { if (response.getStatus().code() != 200) { return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus())); } return response.getContent() .doOnSubscribe(() -> LOG.info("Turbine => Aggregate Stream from URL: " + url)) .doOnUnsubscribe(() -> LOG.info("Turbine => Unsubscribing Stream: " + url)) .map(ServerSentEvent::contentAsString); } ) .timeout(120, TimeUnit.SECONDS) .retryWhen(attempts -> attempts.zipWith(Observable.range(1, Integer.MAX_VALUE), (k, i) -> i) .flatMap(n -> { int waitTimeSeconds = Math.min(6, n) * 10; // wait in 10 second increments up to a max of 1 minute LOG.info("Turbine => Retrying connection to: " + this.url + " in {} seconds", waitTimeSeconds); return Observable.timer(waitTimeSeconds, TimeUnit.SECONDS); }) ) .repeat() .share(); return jsonObservable; }
@OnClick(R.id.client_button) void startClient() { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", PORT, PipelineConfigurators.textOnlyConfigurator()).connect(); connectionObservable.flatMap(connection -> { Observable<String> helloMessage = connection.getInput() .take(1).map(String::trim); // output 10 values at intervals and receive the echo back Observable<String> intervalOutput = Observable.interval(500, TimeUnit.MILLISECONDS) .flatMap(aLong -> connection.writeAndFlush(String.valueOf(aLong + 1)) .map(aVoid -> "")); // capture the output from the server Observable<String> echo = connection.getInput().map(String::trim); // wait for the helloMessage then start the output and receive echo input return Observable.concat(helloMessage, Observable.merge(intervalOutput, echo)); }) .take(10) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Object>() { @Override public void onCompleted() { Log.d(TAG, "Client Complete!"); } @Override public void onError(Throwable throwable) { Log.e(TAG, "onError: " + throwable.getMessage()); } @Override public void onNext(Object o) { final String message = o.toString(); Log.d(TAG, "Client onNext: " + message); adapter.add(message); } }); }
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) { System.out.println("Start " + getClass().getSimpleName() + " on port: " + port); // declare handler chain (wrapped in Hystrix) // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202) HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> { try { long startTime = System.currentTimeMillis(); return handleRequest(request, response) .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms")) .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime))) .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1)) .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1)); } catch (Throwable e) { e.printStackTrace(); System.err.println("Server => Error [" + request.getPath() + "] => " + e); response.setStatus(HttpResponseStatus.BAD_REQUEST); return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n"); } }); return RxNetty.createHttpServer(port, (request, response) -> { // System.out.println("Server => Request: " + request.getPath()); return handlerChain.handle(request, response); }, PipelineConfigurators.<ByteBuf> serveSseConfigurator()); }
@Before public void setUp() { super.setUp(); externalServer = RxNetty.newHttpServerBuilder(0, new ExternalServerHandler()) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build(); externalServer.start(); this.externalServerPort = externalServer.getServerPort(); }
@Before public void setUp() { server = RxNetty.newHttpServerBuilder(0, getHandler()) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build(); server.start(); client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort()) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator()) .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000) .build(); }
public static void main(String[] args) { RxNetty.createTcpServer(8181, PipelineConfigurators.textOnlyConfigurator(), new ConnectionHandler<String, String>() { @Override public Observable<Void> handle(ObservableConnection<String, String> newConnection) { return startEventStream(newConnection); } }).startAndWait(); }
public static void main(String[] args) { final int port = 8080; RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ServerSentEvent>() { @Override public Observable<Void> handle(HttpRequest<ByteBuf> request, HttpResponse<ServerSentEvent> response) { return getIntervalObservable(response); } }, PipelineConfigurators.<ByteBuf>sseServerConfigurator()).startAndWait(); }
@Test public void testChunkedStreaming() throws Exception { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>sseClientConfigurator()); Observable<HttpClientResponse<ServerSentEvent>> response = client.submit(HttpClientRequest.createGet("test/stream")); final List<String> result = new ArrayList<String>(); readResponseContent(response, result); assertEquals(RequestProcessor.smallStreamContent, result); }
@Test public void testMultipleChunks() throws Exception { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators .<ByteBuf>sseClientConfigurator()); Observable<HttpClientResponse<ServerSentEvent>> response = client.submit(HttpClientRequest.createDelete("test/largeStream")); final List<String> result = new ArrayList<String>(); readResponseContent(response, result); assertEquals(RequestProcessor.largeStreamContent, result); }
@Test public void testMultipleChunksWithTransformation() throws Exception { HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port, PipelineConfigurators .<ByteBuf>sseClientConfigurator()); Observable<HttpClientResponse<ServerSentEvent>> response = client.submit(HttpClientRequest.createGet("test/largeStream")); Observable<String> transformed = response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<String>>() { @Override public Observable<String> call(HttpClientResponse<ServerSentEvent> httpResponse) { if (httpResponse.getStatus().equals(HttpResponseStatus.OK)) { return httpResponse.getContent().map(new Func1<ServerSentEvent, String>() { @Override public String call(ServerSentEvent sseEvent) { return sseEvent.getEventData(); } }); } return Observable.error(new RuntimeException("Unexpected response")); } }); final List<String> result = new ArrayList<String>(); transformed.toBlockingObservable().forEach(new Action1<String>() { @Override public void call(String t1) { result.add(t1); } }); assertEquals(RequestProcessor.largeStreamContent, result); }
/** * Execute an HTTP request. * * @param server Server to send the request to. * @param req Request to execute. * @return Observable with the response of the request. */ private static Observable<HttpClientResponse<ByteBuf>> executeSingle(Server server, HttpClientRequest<ByteBuf> req) { HttpClient.HttpClientConfig config = new HttpClient.HttpClientConfig.Builder() .readTimeout(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS) .userAgent(USER_AGENT) .build(); HttpClientBuilder<ByteBuf, ByteBuf> builder = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(server.host(), server.port()) .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator()) .config(config) .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS); if (server.isSecure()) { builder.withSslEngineFactory(DefaultFactories.trustAll()); } final HttpClient<ByteBuf, ByteBuf> client = builder.build(); return client.submit(req) .doOnNext(new Action1<HttpClientResponse<ByteBuf>>() { @Override public void call(HttpClientResponse<ByteBuf> res) { LOGGER.debug("Got response: {}", res.getStatus().code()); } }) .doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { LOGGER.info("Error sending metrics: {}/{}", throwable.getClass().getSimpleName(), throwable.getMessage()); } }) .doOnTerminate(new Action0() { @Override public void call() { client.shutdown(); } }); }
public HttpClientBuilder(String host, int port) { super(host, port); clientConfig = HttpClient.HttpClientConfig.DEFAULT_CONFIG; pipelineConfigurator(PipelineConfigurators.<I, O>httpClientConfigurator()); }
public HttpClientBuilder(Bootstrap bootstrap, String host, int port) { super(bootstrap, host, port); pipelineConfigurator(PipelineConfigurators.<I, O>httpClientConfigurator()); }
public HttpServerBuilder(int port, RequestHandler<I, O> requestHandler) { super(port, new HttpConnectionHandler<I, O>(requestHandler)); pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator()); }
public HttpServerBuilder(ServerBootstrap bootstrap, int port, RequestHandler<I, O> requestHandler) { super(port, new HttpConnectionHandler<I, O>(requestHandler), bootstrap); pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator()); }
public ShutdownListener(int shutdownPort, final Func1<String, Observable<Void>> commandHandler) { shutdownCmdServer = RxNetty.createTcpServer(shutdownPort, PipelineConfigurators.stringMessageConfigurator(), new ShutdownConnectionHandler(commandHandler)); }