Java 类io.reactivex.netty.protocol.http.AbstractHttpContentHolder 实例源码
项目:microservices-dashboard-server
文件:NettyServiceCaller.java
/**
* Calls the remote service using the provided request, applies error handling and
* converts the response into a {@link Map}. The entire request and response are
* executed and handled in a hot observable.
*
* @param serviceId the id of the service for which the request is made
* @param request the request which has to be executed using RxNetty
* @return an {@link Observable} emitting the JSON response as a Map with String keys
* and Object values.
*/
public Observable<Map<String, Object>> retrieveJsonFromRequest(String serviceId, HttpClientRequest<ByteBuf> request) {
RxClient.ServerInfo serverInfo = getServerInfoFromRequestOrClient(request, rxClient);
return rxClient.submit(serverInfo, request)
.publish().autoConnect()
.doOnError(el -> errorHandler.handleNodeError(serviceId, format("Error retrieving node(s) for url {0} with headers {1}: {2}",
request.getUri(), request.getHeaders().entries(), el), el))
.filter(r -> {
if (r.getStatus().code() < 400) {
return true;
} else {
errorHandler.handleNodeWarning(serviceId, "Exception " + r.getStatus() + " for url " + request.getUri() + " with headers " + r.getHeaders().entries());
return false;
}
})
.flatMap(AbstractHttpContentHolder::getContent)
.map(data -> data.toString(Charset.defaultCharset()))
.map(response -> {
JacksonJsonParser jsonParser = new JacksonJsonParser();
return jsonParser.parseMap(response);
})
.doOnNext(r -> logger.info("Json retrieved from call: {}", r))
.onErrorResumeNext(Observable.empty());
}
项目:mesos-rxjava
文件:TcpSocketProxyTest.java
@Test
public void testConnectionTerminatedOnClose() throws Exception {
final TcpSocketProxy proxy = new TcpSocketProxy(
new InetSocketAddress("localhost", 0),
new InetSocketAddress("localhost", server.getServerPort())
);
proxy.start();
final int listenPort = proxy.getListenPort();
final HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", listenPort);
final String first = client.submit(HttpClientRequest.createGet("/"))
.flatMap(AbstractHttpContentHolder::getContent)
.map(bb -> bb.toString(StandardCharsets.UTF_8))
.toBlocking()
.first();
assertThat(first).isEqualTo("Hello World");
LOGGER.info("first request done");
proxy.shutdown();
if (proxy.isShutdown()) {
proxy.close();
} else {
fail("proxy should have been shutdown");
}
try {
final URI uri = URI.create(String.format("http://localhost:%d/", listenPort));
uri.toURL().getContent();
fail("Shouldn't have been able to get content");
} catch (IOException e) {
// expected
}
}