Java 类io.reactivex.netty.protocol.http.client.HttpResponseHeaders 实例源码
项目:azure-documentdb-rxjava
文件:RxGatewayStoreModel.java
private StoreResponse toStoreResponse(HttpResponseStatus httpResponseStatus,
HttpResponseHeaders httpResponseHeaders, InputStream contentInputStream) throws IOException {
List<Entry<String, String>> headerEntries = httpResponseHeaders.entries();
String[] headers = new String[headerEntries.size()];
String[] values = new String[headerEntries.size()];
int i = 0;
for(Entry<String, String> headerEntry: headerEntries) {
headers[i] = headerEntry.getKey();
values[i] = headerEntry.getValue();
i++;
}
StoreResponse storeResponse = new StoreResponse(
headers,
values,
httpResponseStatus.code(),
contentInputStream);
return storeResponse;
}
项目:microservices-dashboard-server
文件:PactsAggregatorTest.java
@SuppressWarnings("unchecked")
@Test
public void getPactUrlsNotFound() throws InterruptedException {
HttpClientResponse<ByteBuf> urlsNotFoundResponse = mock(HttpClientResponse.class);
when(urlsNotFoundResponse.getContent()).thenReturn(null);
when(urlsNotFoundResponse.getStatus()).thenReturn(HttpResponseStatus.NOT_FOUND);
HttpResponseHeaders httpResponseHeaders = mock(HttpResponseHeaders.class);
when(httpResponseHeaders.entries()).thenReturn(newArrayList());
when(urlsNotFoundResponse.getHeaders()).thenReturn(httpResponseHeaders);
when(rxClient.submit(any(RxClient.ServerInfo.class), any(HttpClientRequest.class)))
.thenReturn(Observable.just(urlsNotFoundResponse));
TestSubscriber<Node> testSubscriber = new TestSubscriber<>();
pactsAggregator.aggregateNodes().toBlocking().subscribe(testSubscriber);
testSubscriber.assertNoErrors();
List<Node> nodes = testSubscriber.getOnNextEvents();
assertThat(nodes).isEmpty();
verify(publisher).publishEvent(any(SystemEvent.class));
}
项目:mesos-rxjava
文件:ResponseUtils.java
/**
* Attempts to read the content of an error response as {@code text/plain;charset=utf-8}, otherwise the content
* will be ignored and a string detailing the Content-Type that was not processed.
* <p>
* <b>NOTE:</b>
* <i>
* This method MUST be called from the netty-io thread otherwise the content of the response will not be
* available because if will be released automatically as soon as the netty-io thread is left.
* </i>
* @param resp The response to attempt to read from
* @return An {@link Observable} representing the {@code text/plain;charset=utf-8} response content if it existed
* or an error message indicating the content-type that was not attempted to read.
*/
@NotNull
static Observable<String> attemptToReadErrorResponse(@NotNull final HttpClientResponse<ByteBuf> resp) {
final HttpResponseHeaders headers = resp.getHeaders();
final String contentType = resp.getHeaders().get(HttpHeaderNames.CONTENT_TYPE);
if (headers.isContentLengthSet() && headers.getContentLength() > 0 ) {
if (contentType != null && contentType.startsWith("text/plain")) {
return resp.getContent()
.map(r -> r.toString(StandardCharsets.UTF_8));
} else {
resp.ignoreContent();
final String errMsg = getErrMsg(contentType);
return Observable.just(errMsg);
}
} else {
return Observable.just("");
}
}
项目:azure-documentdb-rxjava
文件:RxGatewayStoreModel.java
/**
* Transforms the rxNetty's client response Observable to DocumentServiceResponse Observable.
*
*
* Once the the customer code subscribes to the observable returned by the {@link AsyncDocumentClient} CRUD APIs,
* the subscription goes up till it reaches the source rxNetty's observable, and at that point the HTTP invocation will be made.
*
* @param clientResponseObservable
* @param request
* @return {@link Observable}
*/
private Observable<DocumentServiceResponse> toDocumentServiceResponse(Observable<HttpClientResponse<ByteBuf>> clientResponseObservable,
RxDocumentServiceRequest request) {
return clientResponseObservable.flatMap(clientResponse -> {
// header key/value pairs
HttpResponseHeaders httpResponseHeaders = clientResponse.getHeaders();
HttpResponseStatus httpResponseStatus = clientResponse.getStatus();
Observable<InputStream> inputStreamObservable;
if (request.getOperationType() == OperationType.Delete) {
// for delete we don't expect any body
inputStreamObservable = Observable.just(null);
} else {
// transforms the observable<ByteBuf> to Observable<InputStream>
inputStreamObservable = toInputStream(clientResponse.getContent());
}
Observable<StoreResponse> storeResponseObservable = inputStreamObservable
.map(contentInputStream -> {
try {
// If there is any error in the header response this throws exception
validateOrThrow(request, httpResponseStatus, httpResponseHeaders, contentInputStream);
// transforms to Observable<StoreResponse>
return toStoreResponse(httpResponseStatus, httpResponseHeaders, contentInputStream);
} catch (Exception e) {
throw Exceptions.propagate(e);
}
});
return storeResponseObservable;
}).map(storeResponse -> new DocumentServiceResponse(storeResponse));
}
项目:microservices-dashboard-server
文件:PactsAggregatorTest.java
@SuppressWarnings("unchecked")
@Test
public void nodeOneNotFound() throws InterruptedException {
HttpClientResponse<ByteBuf> urlsResponse = mock(HttpClientResponse.class);
ByteBuf byteBuf = (new PooledByteBufAllocator()).directBuffer();
ByteBufUtil.writeUtf8(byteBuf, onePactSource);
when(urlsResponse.getContent()).thenReturn(Observable.just(byteBuf));
when(urlsResponse.getStatus()).thenReturn(HttpResponseStatus.OK);
HttpClientResponse<ByteBuf> pactNotFoundResponse = mock(HttpClientResponse.class);
when(pactNotFoundResponse.getContent()).thenReturn(null);
when(pactNotFoundResponse.getStatus()).thenReturn(HttpResponseStatus.NOT_FOUND);
HttpResponseHeaders httpResponseHeaders = mock(HttpResponseHeaders.class);
when(httpResponseHeaders.entries()).thenReturn(newArrayList());
when(pactNotFoundResponse.getHeaders()).thenReturn(httpResponseHeaders);
when(rxClient.submit(any(RxClient.ServerInfo.class), any(HttpClientRequest.class)))
.thenReturn(Observable.just(urlsResponse), Observable.just(pactNotFoundResponse));
TestSubscriber<Node> testSubscriber = new TestSubscriber<>();
pactsAggregator.aggregateNodes().toBlocking().subscribe(testSubscriber);
testSubscriber.assertNoErrors();
List<Node> nodes = testSubscriber.getOnNextEvents();
assertThat(nodes).isEmpty();
verify(publisher).publishEvent(any(SystemEvent.class));
}
项目:mesos-rxjava
文件:SinkSubscriber.java
@Override
public void onNext(final SinkOperation<Send> op) {
try {
final Send toSink = op.getThingToSink();
createPost.call(toSink)
.flatMap(httpClient::submit)
.flatMap(resp -> {
final HttpResponseStatus status = resp.getStatus();
final int code = status.code();
if (code == 202) {
/* This is success */
return Observable.just(Optional.<MesosException>empty());
} else {
final HttpResponseHeaders headers = resp.getHeaders();
return ResponseUtils.attemptToReadErrorResponse(resp)
.map(msg -> {
final List<Map.Entry<String, String>> entries = headers.entries();
final MesosClientErrorContext context = new MesosClientErrorContext(code, msg, entries);
MesosException error;
if (400 <= code && code < 500) {
// client error
error = new Mesos4xxException(toSink, context);
} else if (500 <= code && code < 600) {
// client error
error = new Mesos5xxException(toSink, context);
} else {
// something else that isn't success but not an error as far as http is concerned
error = new MesosException(toSink, context);
}
return Optional.of(error);
});
}
})
.observeOn(Rx.compute())
.subscribe(exception -> {
if (!exception.isPresent()) {
op.onCompleted();
} else {
op.onError(exception.get());
}
});
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
op.onError(e);
}
}