Java 类io.reactivex.netty.channel.StringTransformer 实例源码
项目:Artemis
文件:KaryonPluginModule.java
@Provides
public HttpServer providesKaryonTransport() {
SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
simpleUriRouter.addUri("/foo", new RequestHandler() {
@Override
public Observable<Void> handle(HttpServerRequest request, HttpServerResponse response) {
response.writeAndFlush("Hello World", StringTransformer.DEFAULT_INSTANCE);
response.setStatus(HttpResponseStatus.OK);
return Observable.empty();
}
@Override
public Observable<Void> handle(Object input, Object output) {
return Observable.empty();
}
});
return KaryonTransport.newHttpServer(8888, simpleUriRouter);
}
项目:ribbon
文件:RxMovieTransportExample.java
private Observable<Void> updateRecommendation(String user, Movie movie) {
HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.createPost(format("/users/%s/recommendations", user))
.withHeader("X-Platform-Version", "xyz")
.withHeader("X-Auth-Token", "abc")
.withRawContentSource(Observable.just(movie.getId()), new StringTransformer());
return client.submit(httpRequest).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() {
@Override
public Observable<Void> call(HttpClientResponse<ByteBuf> httpClientResponse) {
if (httpClientResponse.getStatus().code() / 100 != 2) {
return Observable.error(new RuntimeException(
format("HTTP request failed (status code=%s)", httpClientResponse.getStatus())));
}
return Observable.empty();
}
});
}
项目:karyon
文件:HelloWorldEndpoint.java
public Observable<Void> sayHelloToUser(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
JSONObject content = new JSONObject();
int prefixLength = "/hello/to".length();
String userName = request.getPath().substring(prefixLength);
try {
if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
content.put("Error", "Please provide a username to say hello. The URI should be /hello/to/{username}");
} else {
content.put("Message", "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS");
}
} catch (JSONException e) {
logger.error("Error creating json response.", e);
return Observable.error(e);
}
response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE);
return response.close();
}
项目:triathlon
文件:MarathonClient.java
public Observable<HttpClientResponse<ByteBuf>> postMessage(String message) {
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator
= PipelineConfigurators.httpClientConfigurator();
HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(networkAddress.getIpAddress(), port)
.pipelineConfigurator(pipelineConfigurator)
.enableWireLogging(LogLevel.ERROR).build();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/v2/apps");
request.withRawContentSource(Observable.just(message), StringTransformer.DEFAULT_INSTANCE);
request.withHeader("Content-Type", "application/json");
return client.submit(request);
}
项目:ribbon
文件:RibbonModuleTest.java
@SuppressWarnings("unchecked")
@Override
protected Observable<ByteBuf>[] triggerRecommendationsUpdate() {
return new Observable[]{
updateRecommendationTemplate.requestBuilder()
.withRawContentSource(Observable.just(Movie.ORANGE_IS_THE_NEW_BLACK.getId()), new StringTransformer())
.withRequestProperty("userId", TEST_USER)
.build().toObservable(),
updateRecommendationTemplate.requestBuilder()
.withRawContentSource(Observable.just(Movie.BREAKING_BAD.getId()), new StringTransformer())
.withRequestProperty("userId", TEST_USER)
.build().toObservable()
};
}
项目:ribbon
文件:RxMovieTemplateExample.java
@SuppressWarnings("unchecked")
@Override
protected Observable<ByteBuf>[] triggerRecommendationsUpdate() {
return new Observable[]{
updateRecommendationTemplate.requestBuilder()
.withRawContentSource(Observable.just(Movie.ORANGE_IS_THE_NEW_BLACK.getId()), new StringTransformer())
.withRequestProperty("userId", TEST_USER)
.build().toObservable(),
updateRecommendationTemplate.requestBuilder()
.withRawContentSource(Observable.just(Movie.BREAKING_BAD.getId()), new StringTransformer())
.withRequestProperty("userId", TEST_USER)
.build().toObservable()
};
}
项目:ribbon
文件:RxMovieServerTest.java
@Test
public void testMovieRegistration() {
String movieFormatted = ORANGE_IS_THE_NEW_BLACK.toString();
HttpResponseStatus statusCode = RxNetty.createHttpPost(baseURL + "/movies", Observable.just(movieFormatted), new StringTransformer())
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<HttpResponseStatus>>() {
@Override
public Observable<HttpResponseStatus> call(HttpClientResponse<ByteBuf> httpClientResponse) {
return Observable.just(httpClientResponse.getStatus());
}
}).toBlocking().first();
assertEquals(HttpResponseStatus.CREATED, statusCode);
assertEquals(ORANGE_IS_THE_NEW_BLACK, movieServer.movies.get(ORANGE_IS_THE_NEW_BLACK.getId()));
}
项目:ribbon
文件:RxMovieServerTest.java
@Test
public void testUpateRecommendations() {
movieServer.movies.put(ORANGE_IS_THE_NEW_BLACK.getId(), ORANGE_IS_THE_NEW_BLACK);
HttpResponseStatus statusCode = RxNetty.createHttpPost(baseURL + "/users/" + TEST_USER_ID + "/recommendations", Observable.just(ORANGE_IS_THE_NEW_BLACK.getId()), new StringTransformer())
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<HttpResponseStatus>>() {
@Override
public Observable<HttpResponseStatus> call(HttpClientResponse<ByteBuf> httpClientResponse) {
return Observable.just(httpClientResponse.getStatus());
}
}).toBlocking().first();
assertEquals(HttpResponseStatus.OK, statusCode);
assertTrue(movieServer.userRecommendations.get(TEST_USER_ID).contains(ORANGE_IS_THE_NEW_BLACK.getId()));
}
项目:karyon
文件:HelloWorldEndpoint.java
public Observable<Void> sayHello(HttpServerResponse<ByteBuf> response) {
JSONObject content = new JSONObject();
try {
content.put("Message", "Hello from Netflix OSS");
response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE);
return response.close();
} catch (JSONException e) {
logger.error("Error creating json response.", e);
return Observable.error(e);
}
}