@Provides public HttpServer providesKaryonTransport() { SimpleUriRouter simpleUriRouter = new SimpleUriRouter(); simpleUriRouter.addUri("/foo", new RequestHandler() { @Override public Observable<Void> handle(HttpServerRequest request, HttpServerResponse response) { response.writeAndFlush("Hello World", StringTransformer.DEFAULT_INSTANCE); response.setStatus(HttpResponseStatus.OK); return Observable.empty(); } @Override public Observable<Void> handle(Object input, Object output) { return Observable.empty(); } }); return KaryonTransport.newHttpServer(8888, simpleUriRouter); }
private Observable<Void> updateRecommendation(String user, Movie movie) { HttpClientRequest<ByteBuf> httpRequest = HttpClientRequest.createPost(format("/users/%s/recommendations", user)) .withHeader("X-Platform-Version", "xyz") .withHeader("X-Auth-Token", "abc") .withRawContentSource(Observable.just(movie.getId()), new StringTransformer()); return client.submit(httpRequest).flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<Void>>() { @Override public Observable<Void> call(HttpClientResponse<ByteBuf> httpClientResponse) { if (httpClientResponse.getStatus().code() / 100 != 2) { return Observable.error(new RuntimeException( format("HTTP request failed (status code=%s)", httpClientResponse.getStatus()))); } return Observable.empty(); } }); }
public Observable<Void> sayHelloToUser(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { JSONObject content = new JSONObject(); int prefixLength = "/hello/to".length(); String userName = request.getPath().substring(prefixLength); try { if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) { response.setStatus(HttpResponseStatus.BAD_REQUEST); content.put("Error", "Please provide a username to say hello. The URI should be /hello/to/{username}"); } else { content.put("Message", "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS"); } } catch (JSONException e) { logger.error("Error creating json response.", e); return Observable.error(e); } response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE); return response.close(); }
public Observable<HttpClientResponse<ByteBuf>> postMessage(String message) { PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = PipelineConfigurators.httpClientConfigurator(); HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(networkAddress.getIpAddress(), port) .pipelineConfigurator(pipelineConfigurator) .enableWireLogging(LogLevel.ERROR).build(); HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/v2/apps"); request.withRawContentSource(Observable.just(message), StringTransformer.DEFAULT_INSTANCE); request.withHeader("Content-Type", "application/json"); return client.submit(request); }
@SuppressWarnings("unchecked") @Override protected Observable<ByteBuf>[] triggerRecommendationsUpdate() { return new Observable[]{ updateRecommendationTemplate.requestBuilder() .withRawContentSource(Observable.just(Movie.ORANGE_IS_THE_NEW_BLACK.getId()), new StringTransformer()) .withRequestProperty("userId", TEST_USER) .build().toObservable(), updateRecommendationTemplate.requestBuilder() .withRawContentSource(Observable.just(Movie.BREAKING_BAD.getId()), new StringTransformer()) .withRequestProperty("userId", TEST_USER) .build().toObservable() }; }
@Test public void testMovieRegistration() { String movieFormatted = ORANGE_IS_THE_NEW_BLACK.toString(); HttpResponseStatus statusCode = RxNetty.createHttpPost(baseURL + "/movies", Observable.just(movieFormatted), new StringTransformer()) .flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<HttpResponseStatus>>() { @Override public Observable<HttpResponseStatus> call(HttpClientResponse<ByteBuf> httpClientResponse) { return Observable.just(httpClientResponse.getStatus()); } }).toBlocking().first(); assertEquals(HttpResponseStatus.CREATED, statusCode); assertEquals(ORANGE_IS_THE_NEW_BLACK, movieServer.movies.get(ORANGE_IS_THE_NEW_BLACK.getId())); }
@Test public void testUpateRecommendations() { movieServer.movies.put(ORANGE_IS_THE_NEW_BLACK.getId(), ORANGE_IS_THE_NEW_BLACK); HttpResponseStatus statusCode = RxNetty.createHttpPost(baseURL + "/users/" + TEST_USER_ID + "/recommendations", Observable.just(ORANGE_IS_THE_NEW_BLACK.getId()), new StringTransformer()) .flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<HttpResponseStatus>>() { @Override public Observable<HttpResponseStatus> call(HttpClientResponse<ByteBuf> httpClientResponse) { return Observable.just(httpClientResponse.getStatus()); } }).toBlocking().first(); assertEquals(HttpResponseStatus.OK, statusCode); assertTrue(movieServer.userRecommendations.get(TEST_USER_ID).contains(ORANGE_IS_THE_NEW_BLACK.getId())); }
public Observable<Void> sayHello(HttpServerResponse<ByteBuf> response) { JSONObject content = new JSONObject(); try { content.put("Message", "Hello from Netflix OSS"); response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE); return response.close(); } catch (JSONException e) { logger.error("Error creating json response.", e); return Observable.error(e); } }