Java 类io.reactivex.netty.protocol.http.server.RequestHandler 实例源码
项目:gocd-health-check-plugin
文件:HealthCheckTaskExecutorTest.java
/**
* Setups the test environment.
*
* @throws Exception if any error occurs
*/
@Before
public void setUp() throws Exception {
instance = new HealthCheckTaskExecutor();
server = RxNetty.createHttpServer(PORT, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if ("/health".equals(request.getPath())) {
return response.writeStringAndFlush("{\"status\": \"UP\", \"service\": {\"status\": \"UP\"}}");
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).start();
}
项目:XXXX
文件:RealRequestBenchmarks.java
@Setup
public void setup() {
server = RxNetty.createHttpServer(SERVER_PORT, new RequestHandler<ByteBuf, ByteBuf>() {
public rx.Observable handle(HttpServerRequest<ByteBuf> request,
HttpServerResponse<ByteBuf> response) {
return response.flush();
}
});
server.start();
client = new OkHttpClient();
client.setRetryOnConnectionFailure(false);
okFeign = Feign.builder()
.client(new feign.okhttp.OkHttpClient(client))
.target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT);
queryRequest = new Request.Builder()
.url("http://localhost:" + SERVER_PORT + "/?Action=GetUser&Version=2010-05-08&limit=1")
.build();
}
项目:mesos-rxjava
文件:MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable {
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because subscribe will 500");
} catch (Mesos5xxException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(500);
} finally {
server.shutdown();
}
}
项目:mesos-rxjava
文件:MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable {
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "application/json");
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because of content type mismatch");
} catch (MesosException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(200);
assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\"");
} finally {
server.shutdown();
}
}
项目:triathlon
文件:RxTestServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/v2/apps")) {
if (request.getHttpMethod().equals(HttpMethod.POST)) {
return handleTest(request, response);
}
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxTetstServer server started...");
return server;
}
项目:Artemis
文件:KaryonPluginModule.java
@Provides
public HttpServer providesKaryonTransport() {
SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
simpleUriRouter.addUri("/foo", new RequestHandler() {
@Override
public Observable<Void> handle(HttpServerRequest request, HttpServerResponse response) {
response.writeAndFlush("Hello World", StringTransformer.DEFAULT_INSTANCE);
response.setStatus(HttpResponseStatus.OK);
return Observable.empty();
}
@Override
public Observable<Void> handle(Object input, Object output) {
return Observable.empty();
}
});
return KaryonTransport.newHttpServer(8888, simpleUriRouter);
}
项目:ribbon
文件:RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/users")) {
if (request.getHttpMethod().equals(HttpMethod.GET)) {
return handleRecommendationsByUserId(request, response);
} else {
return handleUpdateRecommendationsForUser(request, response);
}
}
if (request.getPath().contains("/recommendations")) {
return handleRecommendationsBy(request, response);
}
if (request.getPath().contains("/movies")) {
return handleRegisterMovie(request, response);
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxMovie server started...");
return server;
}
项目:RxNetty
文件:HttpWelcomeServer.java
public static void main(final String[] args) {
final int port = 8080;
RxNetty.createHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpRequest<ByteBuf> request, final HttpResponse<ByteBuf> response) {
System.out.println("New request recieved");
System.out.println(request.getHttpMethod() + " " + request.getUri() + ' ' + request.getHttpVersion());
for (Map.Entry<String, String> header : request.getHeaders().entries()) {
System.out.println(header.getKey() + ": " + header.getValue());
}
// This does not consume request content, need to figure out an elegant/correct way of doing that.
return response.writeStringAndFlush("Welcome!!! \n\n");
}
}).startAndWait();
}
项目:feign
文件:RealRequestBenchmarks.java
@Setup
public void setup() {
server = RxNetty.createHttpServer(SERVER_PORT, new RequestHandler<ByteBuf, ByteBuf>() {
public rx.Observable handle(HttpServerRequest<ByteBuf> request,
HttpServerResponse<ByteBuf> response) {
return response.flush();
}
});
server.start();
client = new OkHttpClient();
client.setRetryOnConnectionFailure(false);
okFeign = Feign.builder()
.client(new feign.okhttp.OkHttpClient(client))
.target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT);
queryRequest = new Request.Builder()
.url("http://localhost:" + SERVER_PORT + "/?Action=GetUser&Version=2010-05-08&limit=1")
.build();
}
项目:tusRx
文件:TestServer.java
public TestServer() {
Options options = getOptions();
RequestHandler<ByteBuf, ByteBuf> requestHandler = new TusRxRequestHandler(
options, new RxNettyRequestHandlerFactory(options, new UploaderPool(options.getRootDir())));
server = HttpServer.newServer();
CompletableFuture<Void> serverFuture = CompletableFuture.runAsync(() -> server.start(requestHandler));
serverFuture.thenAccept(voyd -> server.awaitShutdown());
serverFuture.join();
}
项目:tusRx
文件:TestServer.java
public static void main(String args[]) {
Options options = getOptions();
RequestHandler<ByteBuf, ByteBuf> requestHandler = new TusRxRequestHandler(
options, new RxNettyRequestHandlerFactory(options, new UploaderPool(options.getRootDir())));
HttpServer<ByteBuf, ByteBuf> server = HttpServer.newServer(8080).start(requestHandler);
server.awaitShutdown();
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
RequestHandler<ByteBuf, ByteBuf> handler = findRequestHandler(request.getUri());
if (handler != null) {
return handler.handle(request, response);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
private RequestHandler<ByteBuf, ByteBuf> findRequestHandler(String uri) {
for (String uriKey : uriHandlers.keySet()) {
if (uri.equals(uriKey) || uri.startsWith(uriKey + PATH_DELIM)) {
return uriHandlers.get(uriKey);
}
}
return null;
}
项目:mesos-rxjava
文件:MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable {
final String errorMessage = "Error message that should come from the server";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
final byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8);
response.getHeaders().setHeader("Content-Length", msgBytes.length);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
response.writeBytes(msgBytes);
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because subscribe will 400");
} catch (Mesos4xxException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(400);
assertThat(ctx.getMessage()).isEqualTo(errorMessage);
} finally {
server.shutdown();
}
}
项目:mesos-rxjava
文件:MesosClientBackpressureIntegrationTest.java
@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < 20000; i++) {
response.writeBytes(hbytes);
response.writeBytes(hmsg);
}
return response.flush();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri).build();
try {
client.openStream().await();
fail("Expect an exception to be propagated up due to backpressure");
} catch (MissingBackpressureException e) {
// expected
e.printStackTrace();
assertThat(e.getMessage()).isNullOrEmpty();
} finally {
server.shutdown();
}
}
项目:mesos-rxjava
文件:MesosClientBackpressureIntegrationTest.java
@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
msgNo = 0;
final int numMessages = 20000;
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < numMessages; i++) {
writeRecordIOMessage(response, heartbeatMessage);
}
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri)
.onBackpressureBuffer()
.build();
try {
client.openStream().await();
} finally {
// 20000 heartbeats PLUS 1 subscribe
assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
server.shutdown();
}
}
项目:hello-karyon-rxnetty
文件:IndexResource.java
public IndexResource() {
endpoint = new HelloEndpoint();
delegate = new SimpleUriRouter<>();
delegate
.addUri("/", new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
final HttpServerResponse<ByteBuf> response) {
return endpoint.getHello()
.flatMap(new Func1<String, Observable<Void>>() {
@Override
public Observable<Void> call(String body) {
String instanceId = "";
String userdata = "";
try{
instanceId = execCmd("curl http://metadata/computeMetadata/v1/instance/id -H Metadata-Flavor:Google") + execCmd("wget -q -O - http://instance-data/latest/meta-data/instance-id");
userdata = System.getenv("USERDATA");
} catch (Exception e){
e.printStackTrace();
}
response.writeString("<html><head><style>body{text-align:center;font-family:'Lucida Grande'}</style></head><body><img src='http://kenzan.com/wp-content/themes/kenzan/images/logo-reg.png' /><h2>Example Spinnaker Application</h2><h3>Instance Id " + instanceId + "</h3><h3>$USERDATA ENV VAR: " + userdata + "</h3></body></html>");
return response.close();
}
});
}
});
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
RequestHandler<ByteBuf, ByteBuf> handler = findRequestHandler(request.getUri());
if (handler != null) {
return handler.handle(request, response);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
private RequestHandler<ByteBuf, ByteBuf> findRequestHandler(String uri) {
for (String uriKey : uriHandlers.keySet()) {
if (uri.equals(uriKey) || uri.startsWith(uriKey + PATH_DELIM)) {
return uriHandlers.get(uriKey);
}
}
return null;
}
项目:Prana
文件:HostsHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
ArrayList<InstanceInfo> instanceInfos = new ArrayList<>();
instanceInfos.add(InstanceInfo.Builder.newBuilder().setAppName("foo").setVIPAddress("bar").setHostName("host1").build());
instanceInfos.add(InstanceInfo.Builder.newBuilder().setAppName("foo").setVIPAddress("bar").setHostName("host2").build());
when(hostService.getHosts("foo")).thenReturn(instanceInfos);
return new HostsHandler(hostService, new ObjectMapper());
}
项目:RxNetty
文件:HttpSseServer.java
public static void main(String[] args) {
final int port = 8080;
RxNetty.createHttpServer(port,
new RequestHandler<ByteBuf, ServerSentEvent>() {
@Override
public Observable<Void> handle(HttpRequest<ByteBuf> request,
HttpResponse<ServerSentEvent> response) {
return getIntervalObservable(response);
}
}, PipelineConfigurators.<ByteBuf>sseServerConfigurator()).startAndWait();
}
项目:karyon
文件:KaryonHttpModule.java
protected KaryonHttpModule(String moduleName, Class<I> iType, Class<O> oType) {
super(moduleName, iType, oType);
routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
httpServerKey = keyFor(HttpServer.class, iType, oType, nameAnnotation);
}
项目:karyon
文件:HttpRxServerProvider.java
public HttpRxServerProvider(String name, Class<I> iType, Class<O> oType) {
nameAnnotation = Names.named(name);
routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
pipelineConfiguratorKey = Key.get(PipelineConfigurator.class, nameAnnotation);
metricEventsListenerFactoryKey = Key.get(MetricEventsListenerFactory.class, nameAnnotation);
serverConfigKey = Key.get(ServerConfig.class, nameAnnotation);
}
项目:karyon
文件:HttpRxServerProvider.java
@SuppressWarnings("rawtypes")
@Inject
public void setInjector(Injector injector) {
HttpServerConfig config = (HttpServerConfig) injector.getInstance(serverConfigKey);
RequestHandler router = injector.getInstance(routerKey);
GovernatorHttpInterceptorSupport<I, O> interceptorSupport = injector.getInstance(interceptorSupportKey);
interceptorSupport.finish(injector);
HttpRequestHandler<I, O> httpRequestHandler = new HttpRequestHandler<I, O>(router, interceptorSupport);
HttpServerBuilder<I, O> builder = KaryonTransport.newHttpServerBuilder(config.getPort(), httpRequestHandler);
if (config.requiresThreadPool()) {
builder.withRequestProcessingThreads(config.getThreadPoolSize());
}
if (injector.getExistingBinding(pipelineConfiguratorKey) != null) {
builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey));
}
if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) {
builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey));
}
httpServer = builder.build().start();
logger.info("Starting server {} on port {}...", nameAnnotation.value(), httpServer.getServerPort());
}
项目:karyon
文件:Karyon.java
/**
* Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
* handling to {@link RequestHandler}.
* The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
*
* @param port Port for the server.
* @param handler Request Handler
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
BootstrapModule... bootstrapModules) {
HttpServer<ByteBuf, ByteBuf> httpServer =
KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
HttpServerResponse<ByteBuf> response) {
return handler.handle(request, response);
}
});
return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
public HttpRequestHandler addUriHandler(String uri, RequestHandler<ByteBuf, ByteBuf> requestHandler) {
this.uriHandlers.put(uri, requestHandler);
return this;
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
public List<RequestHandler<ByteBuf, ByteBuf>> getUriHandlers() {
return new ArrayList<>(uriHandlers.values());
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
public HttpRequestHandler addUriHandler(String uri, RequestHandler<ByteBuf, ByteBuf> requestHandler) {
this.uriHandlers.put(uri, requestHandler);
return this;
}
项目:WeatherAlarm
文件:HttpRequestHandler.java
public List<RequestHandler<ByteBuf, ByteBuf>> getUriHandlers() {
return new ArrayList<>(uriHandlers.values());
}
项目:ReactiveLab
文件:HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(Metrics metrics, String hystrixPrefix, long interval, RequestHandler<I, O> appHandler) {
this.metrics = metrics;
this.hystrixPrefix = hystrixPrefix;
this.interval = interval;
this.appHandler = appHandler;
}
项目:ReactiveLab
文件:HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(RequestHandler<I, O> appHandler) {
this(DEFAULT_HYSTRIX_PREFIX, DEFAULT_INTERVAL, appHandler);
}
项目:ReactiveLab
文件:HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(String hystrixPrefix, long interval, RequestHandler<I, O> appHandler) {
this.hystrixPrefix = hystrixPrefix;
this.interval = interval;
this.appHandler = appHandler;
}
项目:Prana
文件:HealthCheckHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
return new HealthCheckHandler(objectMapper);
}
项目:Prana
文件:StatusHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
return new StatusHandler(objectMapper, applicationInfoManager);
}
项目:Prana
文件:PingHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
return new PingHandler(objectMapper);
}
项目:Prana
文件:DynamicPropertiesHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
return new DynamicPropertiesHandler(objectMapper);
}
项目:RxNetty
文件:RxNetty.java
public static HttpServer<ByteBuf, ByteBuf> createHttpServer(int port, RequestHandler<ByteBuf, ByteBuf> requestHandler) {
return new HttpServerBuilder<ByteBuf, ByteBuf>(port, requestHandler).build();
}
项目:RxNetty
文件:RxNetty.java
public static <I, O> HttpServer<I, O> createHttpServer(int port,
RequestHandler<I, O> requestHandler,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> configurator) {
return new HttpServerBuilder<I, O>(port, requestHandler).pipelineConfigurator(configurator).build();
}
项目:karyon
文件:KaryonHttpModule.java
protected LinkedBindingBuilder<RequestHandler<I, O>> bindRouter() {
return bind(routerKey);
}
项目:karyon
文件:KaryonTransport.java
public static <I, O> HttpServerBuilder<I, O> newHttpServerBuilder(int port, RequestHandler<I, O> router) {
return RxContexts.newHttpServerBuilder(port, new HttpRequestHandler<I, O>(router), RxContexts.DEFAULT_CORRELATOR);
}