Java 类io.reactivex.netty.pipeline.PipelineConfigurators 实例源码
项目:triathlon
文件:RxTestServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/v2/apps")) {
if (request.getHttpMethod().equals(HttpMethod.POST)) {
return handleTest(request, response);
}
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxTetstServer server started...");
return server;
}
项目:RxNetty-Android
文件:ServerFragment.java
@Override public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
server = RxNetty.createTcpServer(PORT, PipelineConfigurators.textOnlyConfigurator(),
connection -> {
mainHandler.post(() -> adapter.add("New client connection established."));
connection.writeAndFlush("Welcome! \n\n");
return connection.getInput().flatMap(msg -> {
Log.d(TAG, "Server onNext: " + msg);
msg = msg.trim();
if (!msg.isEmpty()) {
return connection.writeAndFlush("echo => " + msg + '\n');
} else {
return Observable.empty();
}
});
});
}
项目:nibbler
文件:NettyHttpServer.java
private HttpServer<ByteBuf, ByteBuf> createServer(RestHttpServerConfiguration configuration) {
ServerBootstrap serverBootstrap = createServerBootstrap(configuration.getInterface(), configuration.getPort());
HttpServerBuilder<ByteBuf, ByteBuf> httpServerBuilder =
new HttpServerBuilder<>(serverBootstrap, configuration.getPort(),
createServiceRequestHandler(configuration.getRestServices(), configuration.getConverters()))
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator())
.withRequestProcessingThreads(configuration.getRequestProcessingThreads())
.enableWireLogging(LogLevel.DEBUG);
String message = "starting http server on port %s at interface %s, with %s processing threads ...";
log.info(String.format(message, configuration.getPort(), configuration.getInterface(), configuration.getRequestProcessingThreads()));
logServices(configuration.getRestServices());
return httpServerBuilder.build();
}
项目:Prana
文件:HealthCheckHandler.java
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
String host = "localhost";
int port = DEFAULT_APPLICATION_PORT;
String path = "/healthcheck";
try {
URL url = new URL(externalHealthCheckURL);
host = url.getHost();
port = url.getPort();
path = url.getPath();
} catch (MalformedURLException e) {
//continue
}
Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.build();
return httpClient.submit(HttpClientRequest.createGet(path));
}
项目:ribbon
文件:RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/users")) {
if (request.getHttpMethod().equals(HttpMethod.GET)) {
return handleRecommendationsByUserId(request, response);
} else {
return handleUpdateRecommendationsForUser(request, response);
}
}
if (request.getPath().contains("/recommendations")) {
return handleRecommendationsBy(request, response);
}
if (request.getPath().contains("/movies")) {
return handleRegisterMovie(request, response);
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxMovie server started...");
return server;
}
项目:RxNetty
文件:TcpEventStreamClientFast.java
public static void main(String[] args) {
Observable<ObservableConnection<String, String>> connectionObservable =
RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect();
connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() {
@Override
public Observable<?> call(ObservableConnection<String, String> connection) {
return connection.getInput().map(new Func1<String, String>() {
@Override
public String call(String msg) {
return msg.trim();
}
});
}
}).toBlockingObservable().forEach(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println("onNext event => " + o);
}
});
}
项目:RxNetty
文件:TcpEchoServer.java
public static void main(final String[] args) {
final int port = 8181;
RxNetty.createTcpServer(port, PipelineConfigurators.textOnlyConfigurator(),
new ConnectionHandler<String, String>() {
@Override
public Observable<Void> handle(
final ObservableConnection<String, String> connection) {
System.out.println("New client connection established.");
connection.writeAndFlush("Welcome! \n\n");
return connection.getInput().flatMap(new Func1<String, Observable<Void>>() {
@Override
public Observable<Void> call(String msg) {
System.out.println("onNext: " + msg);
msg = msg.trim();
if (!msg.isEmpty()) {
return connection.writeAndFlush("echo => " + msg + '\n');
} else {
return COMPLETED_OBSERVABLE;
}
}
});
}
}).startAndWait();
}
项目:RxNetty
文件:TcpEventStreamClientSlow.java
public static void main(String[] args) {
Observable<ObservableConnection<String, String>> connectionObservable =
RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect();
connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() {
@Override
public Observable<?> call(ObservableConnection<String, String> connection) {
return connection.getInput().map(new Func1<String, String>() {
@Override
public String call(String msg) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return msg.trim();
}
});
}
}).toBlockingObservable().forEach(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println("onNext event => " + o);
}
});
}
项目:RxNetty
文件:HttpClientTest.java
@Test
public void testNonChunkingStream() throws Exception {
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
PipelineConfigurators.<ByteBuf>sseClientConfigurator());
Observable<HttpClientResponse<ServerSentEvent>> response =
client.submit(HttpClientRequest.createGet("test/nochunk_stream"));
final List<String> result = new ArrayList<String>();
response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() {
@Override
public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> httpResponse) {
return httpResponse.getContent();
}
}).toBlockingObservable().forEach(new Action1<ServerSentEvent>() {
@Override
public void call(ServerSentEvent event) {
result.add(event.getEventData());
}
});
assertEquals(RequestProcessor.smallStreamContent, result);
}
项目:MarketData
文件:RxNettyEventEventStreamClient.java
private Observable<String> initializeStream() {
HttpClient<ByteBuf, ServerSentEvent> client =
RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());
return client.submit(HttpClientRequest.createGet("/hello")).
flatMap(response -> {
printResponseHeader(response);
return response.getContent();
}).map(serverSentEvent -> serverSentEvent.contentAsString());
}
项目:MarketData
文件:RxNettyEventServer.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
(request, response) -> {
response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.getHeaders().set(CACHE_CONTROL, "no-cache");
response.getHeaders().set(CONNECTION, "keep-alive");
response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
return getIntervalObservable(request, response);
}, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
System.out.println("HTTP Server Sent Events server started...");
return server;
}
项目:vizceral-hystrix
文件:HystrixReader.java
/**
* Creates a new hystrix reader.
*
* @param configuration The configuration to use.
* @param cluster The cluster to read from.
*/
public HystrixReader(Configuration configuration, String cluster)
{
this.configuration = configuration;
this.cluster = cluster;
HttpClientBuilder<ByteBuf, ServerSentEvent> builder = RxNetty.newHttpClientBuilder(configuration.getTurbineHost(), configuration.getTurbinePort());
builder.pipelineConfigurator(PipelineConfigurators.clientSseConfigurator());
if (configuration.isSecure())
{
builder.withSslEngineFactory(DefaultFactories.trustAll());
}
rxNetty = builder.build();
}
项目:triathlon
文件:MarathonClient.java
public Observable<HttpClientResponse<ByteBuf>> postMessage(String message) {
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator
= PipelineConfigurators.httpClientConfigurator();
HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(networkAddress.getIpAddress(), port)
.pipelineConfigurator(pipelineConfigurator)
.enableWireLogging(LogLevel.ERROR).build();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/v2/apps");
request.withRawContentSource(Observable.just(message), StringTransformer.DEFAULT_INSTANCE);
request.withHeader("Content-Type", "application/json");
return client.submit(request);
}
项目:triathlon
文件:HealthCheck.java
private HttpClientResponse<ByteBuf> getResponse(String serviceUrl) throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException {
String host, path;
int port;
URL url = new URL(serviceUrl);
host = url.getHost();
port = url.getPort();
path = url.getPath();
System.out.println(url);
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.build();
return httpClient.submit(HttpClientRequest.createGet(path)).toBlocking().toFuture().get(checkTimeout, TimeUnit.MILLISECONDS);
}
项目:argos-dashboard
文件:DefaultHystrixClusterMonitor.java
@Override
public Observable<String> observeJson() {
if(jsonObservable != null) {
return jsonObservable;
}
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(url.getPath() + "?" + url.getQuery());
int port = url.getPort() < 0 ? url.getDefaultPort() : url.getPort();
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(url.getHost(), port)
.withNoConnectionPooling()
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
.build();
jsonObservable = client.submit(request)
.doOnError(t -> LOG.error("Error connecting to " + url, t))
.flatMap(response -> {
if (response.getStatus().code() != 200) {
return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus()));
}
return response.getContent()
.doOnSubscribe(() -> LOG.info("Turbine => Aggregate Stream from URL: " + url))
.doOnUnsubscribe(() -> LOG.info("Turbine => Unsubscribing Stream: " + url))
.map(ServerSentEvent::contentAsString);
}
)
.timeout(120, TimeUnit.SECONDS)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, Integer.MAX_VALUE), (k, i) -> i)
.flatMap(n -> {
int waitTimeSeconds = Math.min(6, n) * 10; // wait in 10 second increments up to a max of 1 minute
LOG.info("Turbine => Retrying connection to: " + this.url + " in {} seconds", waitTimeSeconds);
return Observable.timer(waitTimeSeconds, TimeUnit.SECONDS);
})
)
.repeat()
.share();
return jsonObservable;
}
项目:RxNetty-Android
文件:ServerFragment.java
@OnClick(R.id.client_button) void startClient() {
Observable<ObservableConnection<String, String>> connectionObservable =
RxNetty.createTcpClient("localhost", PORT, PipelineConfigurators.textOnlyConfigurator()).connect();
connectionObservable.flatMap(connection -> {
Observable<String> helloMessage = connection.getInput()
.take(1).map(String::trim);
// output 10 values at intervals and receive the echo back
Observable<String> intervalOutput =
Observable.interval(500, TimeUnit.MILLISECONDS)
.flatMap(aLong -> connection.writeAndFlush(String.valueOf(aLong + 1))
.map(aVoid -> ""));
// capture the output from the server
Observable<String> echo = connection.getInput().map(String::trim);
// wait for the helloMessage then start the output and receive echo input
return Observable.concat(helloMessage, Observable.merge(intervalOutput, echo));
})
.take(10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override public void onCompleted() {
Log.d(TAG, "Client Complete!");
}
@Override public void onError(Throwable throwable) {
Log.e(TAG, "onError: " + throwable.getMessage());
}
@Override public void onNext(Object o) {
final String message = o.toString();
Log.d(TAG, "Client onNext: " + message);
adapter.add(message);
}
});
}
项目:ReactiveLab
文件:AbstractMiddleTierService.java
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);
// declare handler chain (wrapped in Hystrix)
// TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain
= new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
try {
long startTime = System.currentTimeMillis();
return handleRequest(request, response)
.doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
.doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
.doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
.doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
} catch (Throwable e) {
e.printStackTrace();
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
}
});
return RxNetty.createHttpServer(port, (request, response) -> {
// System.out.println("Server => Request: " + request.getPath());
return handlerChain.handle(request, response);
}, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
项目:Prana
文件:HealthCheckHandlerTest.java
@Before
public void setUp() {
super.setUp();
externalServer = RxNetty.newHttpServerBuilder(0, new ExternalServerHandler())
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
externalServer.start();
this.externalServerPort = externalServer.getServerPort();
}
项目:Prana
文件:AbstractIntegrationTest.java
@Before
public void setUp() {
server = RxNetty.newHttpServerBuilder(0, getHandler())
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
server.start();
client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort())
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
.build();
}
项目:RxNetty
文件:TcpEventStreamServer.java
public static void main(String[] args) {
RxNetty.createTcpServer(8181, PipelineConfigurators.textOnlyConfigurator(),
new ConnectionHandler<String, String>() {
@Override
public Observable<Void> handle(ObservableConnection<String, String> newConnection) {
return startEventStream(newConnection);
}
}).startAndWait();
}
项目:RxNetty
文件:HttpSseServer.java
public static void main(String[] args) {
final int port = 8080;
RxNetty.createHttpServer(port,
new RequestHandler<ByteBuf, ServerSentEvent>() {
@Override
public Observable<Void> handle(HttpRequest<ByteBuf> request,
HttpResponse<ServerSentEvent> response) {
return getIntervalObservable(response);
}
}, PipelineConfigurators.<ByteBuf>sseServerConfigurator()).startAndWait();
}
项目:RxNetty
文件:HttpClientTest.java
@Test
public void testChunkedStreaming() throws Exception {
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
PipelineConfigurators.<ByteBuf>sseClientConfigurator());
Observable<HttpClientResponse<ServerSentEvent>> response =
client.submit(HttpClientRequest.createGet("test/stream"));
final List<String> result = new ArrayList<String>();
readResponseContent(response, result);
assertEquals(RequestProcessor.smallStreamContent, result);
}
项目:RxNetty
文件:HttpClientTest.java
@Test
public void testMultipleChunks() throws Exception {
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
PipelineConfigurators
.<ByteBuf>sseClientConfigurator());
Observable<HttpClientResponse<ServerSentEvent>> response =
client.submit(HttpClientRequest.createDelete("test/largeStream"));
final List<String> result = new ArrayList<String>();
readResponseContent(response, result);
assertEquals(RequestProcessor.largeStreamContent, result);
}
项目:RxNetty
文件:HttpClientTest.java
@Test
public void testMultipleChunksWithTransformation() throws Exception {
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
PipelineConfigurators
.<ByteBuf>sseClientConfigurator());
Observable<HttpClientResponse<ServerSentEvent>> response =
client.submit(HttpClientRequest.createGet("test/largeStream"));
Observable<String> transformed = response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<String>>() {
@Override
public Observable<String> call(HttpClientResponse<ServerSentEvent> httpResponse) {
if (httpResponse.getStatus().equals(HttpResponseStatus.OK)) {
return httpResponse.getContent().map(new Func1<ServerSentEvent, String>() {
@Override
public String call(ServerSentEvent sseEvent) {
return sseEvent.getEventData();
}
});
}
return Observable.error(new RuntimeException("Unexpected response"));
}
});
final List<String> result = new ArrayList<String>();
transformed.toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String t1) {
result.add(t1);
}
});
assertEquals(RequestProcessor.largeStreamContent, result);
}
项目:atlas-oss-plugin
文件:RxHttp.java
/**
* Execute an HTTP request.
*
* @param server Server to send the request to.
* @param req Request to execute.
* @return Observable with the response of the request.
*/
private static Observable<HttpClientResponse<ByteBuf>>
executeSingle(Server server, HttpClientRequest<ByteBuf> req) {
HttpClient.HttpClientConfig config = new HttpClient.HttpClientConfig.Builder()
.readTimeout(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.userAgent(USER_AGENT)
.build();
HttpClientBuilder<ByteBuf, ByteBuf> builder =
RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(server.host(), server.port())
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.config(config)
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS);
if (server.isSecure()) {
builder.withSslEngineFactory(DefaultFactories.trustAll());
}
final HttpClient<ByteBuf, ByteBuf> client = builder.build();
return client.submit(req)
.doOnNext(new Action1<HttpClientResponse<ByteBuf>>() {
@Override
public void call(HttpClientResponse<ByteBuf> res) {
LOGGER.debug("Got response: {}", res.getStatus().code());
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
LOGGER.info("Error sending metrics: {}/{}",
throwable.getClass().getSimpleName(),
throwable.getMessage());
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
client.shutdown();
}
});
}
项目:RxNetty
文件:HttpClientBuilder.java
public HttpClientBuilder(String host, int port) {
super(host, port);
clientConfig = HttpClient.HttpClientConfig.DEFAULT_CONFIG;
pipelineConfigurator(PipelineConfigurators.<I, O>httpClientConfigurator());
}
项目:RxNetty
文件:HttpClientBuilder.java
public HttpClientBuilder(Bootstrap bootstrap, String host, int port) {
super(bootstrap, host, port);
pipelineConfigurator(PipelineConfigurators.<I, O>httpClientConfigurator());
}
项目:RxNetty
文件:HttpServerBuilder.java
public HttpServerBuilder(int port, RequestHandler<I, O> requestHandler) {
super(port, new HttpConnectionHandler<I, O>(requestHandler));
pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator());
}
项目:RxNetty
文件:HttpServerBuilder.java
public HttpServerBuilder(ServerBootstrap bootstrap, int port, RequestHandler<I, O> requestHandler) {
super(port, new HttpConnectionHandler<I, O>(requestHandler), bootstrap);
pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator());
}
项目:karyon
文件:ShutdownListener.java
public ShutdownListener(int shutdownPort, final Func1<String, Observable<Void>> commandHandler) {
shutdownCmdServer = RxNetty.createTcpServer(shutdownPort,
PipelineConfigurators.stringMessageConfigurator(),
new ShutdownConnectionHandler(commandHandler));
}