private StoreResponse toStoreResponse(HttpResponseStatus httpResponseStatus, HttpResponseHeaders httpResponseHeaders, InputStream contentInputStream) throws IOException { List<Entry<String, String>> headerEntries = httpResponseHeaders.entries(); String[] headers = new String[headerEntries.size()]; String[] values = new String[headerEntries.size()]; int i = 0; for(Entry<String, String> headerEntry: headerEntries) { headers[i] = headerEntry.getKey(); values[i] = headerEntry.getValue(); i++; } StoreResponse storeResponse = new StoreResponse( headers, values, httpResponseStatus.code(), contentInputStream); return storeResponse; }
@SuppressWarnings("unchecked") @Test public void getPactUrlsNotFound() throws InterruptedException { HttpClientResponse<ByteBuf> urlsNotFoundResponse = mock(HttpClientResponse.class); when(urlsNotFoundResponse.getContent()).thenReturn(null); when(urlsNotFoundResponse.getStatus()).thenReturn(HttpResponseStatus.NOT_FOUND); HttpResponseHeaders httpResponseHeaders = mock(HttpResponseHeaders.class); when(httpResponseHeaders.entries()).thenReturn(newArrayList()); when(urlsNotFoundResponse.getHeaders()).thenReturn(httpResponseHeaders); when(rxClient.submit(any(RxClient.ServerInfo.class), any(HttpClientRequest.class))) .thenReturn(Observable.just(urlsNotFoundResponse)); TestSubscriber<Node> testSubscriber = new TestSubscriber<>(); pactsAggregator.aggregateNodes().toBlocking().subscribe(testSubscriber); testSubscriber.assertNoErrors(); List<Node> nodes = testSubscriber.getOnNextEvents(); assertThat(nodes).isEmpty(); verify(publisher).publishEvent(any(SystemEvent.class)); }
/** * Attempts to read the content of an error response as {@code text/plain;charset=utf-8}, otherwise the content * will be ignored and a string detailing the Content-Type that was not processed. * <p> * <b>NOTE:</b> * <i> * This method MUST be called from the netty-io thread otherwise the content of the response will not be * available because if will be released automatically as soon as the netty-io thread is left. * </i> * @param resp The response to attempt to read from * @return An {@link Observable} representing the {@code text/plain;charset=utf-8} response content if it existed * or an error message indicating the content-type that was not attempted to read. */ @NotNull static Observable<String> attemptToReadErrorResponse(@NotNull final HttpClientResponse<ByteBuf> resp) { final HttpResponseHeaders headers = resp.getHeaders(); final String contentType = resp.getHeaders().get(HttpHeaderNames.CONTENT_TYPE); if (headers.isContentLengthSet() && headers.getContentLength() > 0 ) { if (contentType != null && contentType.startsWith("text/plain")) { return resp.getContent() .map(r -> r.toString(StandardCharsets.UTF_8)); } else { resp.ignoreContent(); final String errMsg = getErrMsg(contentType); return Observable.just(errMsg); } } else { return Observable.just(""); } }
/** * Transforms the rxNetty's client response Observable to DocumentServiceResponse Observable. * * * Once the the customer code subscribes to the observable returned by the {@link AsyncDocumentClient} CRUD APIs, * the subscription goes up till it reaches the source rxNetty's observable, and at that point the HTTP invocation will be made. * * @param clientResponseObservable * @param request * @return {@link Observable} */ private Observable<DocumentServiceResponse> toDocumentServiceResponse(Observable<HttpClientResponse<ByteBuf>> clientResponseObservable, RxDocumentServiceRequest request) { return clientResponseObservable.flatMap(clientResponse -> { // header key/value pairs HttpResponseHeaders httpResponseHeaders = clientResponse.getHeaders(); HttpResponseStatus httpResponseStatus = clientResponse.getStatus(); Observable<InputStream> inputStreamObservable; if (request.getOperationType() == OperationType.Delete) { // for delete we don't expect any body inputStreamObservable = Observable.just(null); } else { // transforms the observable<ByteBuf> to Observable<InputStream> inputStreamObservable = toInputStream(clientResponse.getContent()); } Observable<StoreResponse> storeResponseObservable = inputStreamObservable .map(contentInputStream -> { try { // If there is any error in the header response this throws exception validateOrThrow(request, httpResponseStatus, httpResponseHeaders, contentInputStream); // transforms to Observable<StoreResponse> return toStoreResponse(httpResponseStatus, httpResponseHeaders, contentInputStream); } catch (Exception e) { throw Exceptions.propagate(e); } }); return storeResponseObservable; }).map(storeResponse -> new DocumentServiceResponse(storeResponse)); }
@SuppressWarnings("unchecked") @Test public void nodeOneNotFound() throws InterruptedException { HttpClientResponse<ByteBuf> urlsResponse = mock(HttpClientResponse.class); ByteBuf byteBuf = (new PooledByteBufAllocator()).directBuffer(); ByteBufUtil.writeUtf8(byteBuf, onePactSource); when(urlsResponse.getContent()).thenReturn(Observable.just(byteBuf)); when(urlsResponse.getStatus()).thenReturn(HttpResponseStatus.OK); HttpClientResponse<ByteBuf> pactNotFoundResponse = mock(HttpClientResponse.class); when(pactNotFoundResponse.getContent()).thenReturn(null); when(pactNotFoundResponse.getStatus()).thenReturn(HttpResponseStatus.NOT_FOUND); HttpResponseHeaders httpResponseHeaders = mock(HttpResponseHeaders.class); when(httpResponseHeaders.entries()).thenReturn(newArrayList()); when(pactNotFoundResponse.getHeaders()).thenReturn(httpResponseHeaders); when(rxClient.submit(any(RxClient.ServerInfo.class), any(HttpClientRequest.class))) .thenReturn(Observable.just(urlsResponse), Observable.just(pactNotFoundResponse)); TestSubscriber<Node> testSubscriber = new TestSubscriber<>(); pactsAggregator.aggregateNodes().toBlocking().subscribe(testSubscriber); testSubscriber.assertNoErrors(); List<Node> nodes = testSubscriber.getOnNextEvents(); assertThat(nodes).isEmpty(); verify(publisher).publishEvent(any(SystemEvent.class)); }
@Override public void onNext(final SinkOperation<Send> op) { try { final Send toSink = op.getThingToSink(); createPost.call(toSink) .flatMap(httpClient::submit) .flatMap(resp -> { final HttpResponseStatus status = resp.getStatus(); final int code = status.code(); if (code == 202) { /* This is success */ return Observable.just(Optional.<MesosException>empty()); } else { final HttpResponseHeaders headers = resp.getHeaders(); return ResponseUtils.attemptToReadErrorResponse(resp) .map(msg -> { final List<Map.Entry<String, String>> entries = headers.entries(); final MesosClientErrorContext context = new MesosClientErrorContext(code, msg, entries); MesosException error; if (400 <= code && code < 500) { // client error error = new Mesos4xxException(toSink, context); } else if (500 <= code && code < 600) { // client error error = new Mesos5xxException(toSink, context); } else { // something else that isn't success but not an error as far as http is concerned error = new MesosException(toSink, context); } return Optional.of(error); }); } }) .observeOn(Rx.compute()) .subscribe(exception -> { if (!exception.isPresent()) { op.onCompleted(); } else { op.onError(exception.get()); } }); } catch (Throwable e) { Exceptions.throwIfFatal(e); op.onError(e); } }