/** * Calls the remote service using the provided request, applies error handling and * converts the response into a {@link Map}. The entire request and response are * executed and handled in a hot observable. * * @param serviceId the id of the service for which the request is made * @param request the request which has to be executed using RxNetty * @return an {@link Observable} emitting the JSON response as a Map with String keys * and Object values. */ public Observable<Map<String, Object>> retrieveJsonFromRequest(String serviceId, HttpClientRequest<ByteBuf> request) { RxClient.ServerInfo serverInfo = getServerInfoFromRequestOrClient(request, rxClient); return rxClient.submit(serverInfo, request) .publish().autoConnect() .doOnError(el -> errorHandler.handleNodeError(serviceId, format("Error retrieving node(s) for url {0} with headers {1}: {2}", request.getUri(), request.getHeaders().entries(), el), el)) .filter(r -> { if (r.getStatus().code() < 400) { return true; } else { errorHandler.handleNodeWarning(serviceId, "Exception " + r.getStatus() + " for url " + request.getUri() + " with headers " + r.getHeaders().entries()); return false; } }) .flatMap(AbstractHttpContentHolder::getContent) .map(data -> data.toString(Charset.defaultCharset())) .map(response -> { JacksonJsonParser jsonParser = new JacksonJsonParser(); return jsonParser.parseMap(response); }) .doOnNext(r -> logger.info("Json retrieved from call: {}", r)) .onErrorResumeNext(Observable.empty()); }
@Test public void testConnectionTerminatedOnClose() throws Exception { final TcpSocketProxy proxy = new TcpSocketProxy( new InetSocketAddress("localhost", 0), new InetSocketAddress("localhost", server.getServerPort()) ); proxy.start(); final int listenPort = proxy.getListenPort(); final HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", listenPort); final String first = client.submit(HttpClientRequest.createGet("/")) .flatMap(AbstractHttpContentHolder::getContent) .map(bb -> bb.toString(StandardCharsets.UTF_8)) .toBlocking() .first(); assertThat(first).isEqualTo("Hello World"); LOGGER.info("first request done"); proxy.shutdown(); if (proxy.isShutdown()) { proxy.close(); } else { fail("proxy should have been shutdown"); } try { final URI uri = URI.create(String.format("http://localhost:%d/", listenPort)); uri.toURL().getContent(); fail("Shouldn't have been able to get content"); } catch (IOException e) { // expected } }