@Override protected Mono<Void> renderInternal(Map<String, Object> model, MediaType contentType, ServerWebExchange exchange) { Resource resource = resolveResource(); if (resource == null) { return Mono.error(new IllegalStateException( "Could not find Mustache template with URL [" + getUrl() + "]")); } DataBuffer dataBuffer = exchange.getResponse().bufferFactory().allocateBuffer(); try (Reader reader = getReader(resource)) { Template template = this.compiler.compile(reader); Charset charset = getCharset(contentType).orElse(getDefaultCharset()); try (Writer writer = new OutputStreamWriter(dataBuffer.asOutputStream(), charset)) { template.execute(model, writer); writer.flush(); } } catch (Exception ex) { DataBufferUtils.release(dataBuffer); return Mono.error(ex); } return exchange.getResponse().writeWith(Flux.just(dataBuffer)); }
private BodyInserter<?, ? super ClientHttpRequest> buildBody(Object[] args) { if (bodyIndex == null) { return BodyInserters.empty(); } Object body = args[bodyIndex]; if (isDataBufferPublisher(requestBodyType)) { return BodyInserters.fromDataBuffers((Publisher<DataBuffer>) body); } else if (isPublisher(requestBodyType)) { return BodyInserters.fromPublisher((Publisher) body, requestBodyType.getGeneric(0).getRawClass()); } else if (isResource(requestBodyType)) { return BodyInserters.fromResource((Resource) body); } else if (isFormData(requestBodyType)) { return BodyInserters.fromFormData((MultiValueMap<String, String>) body); } else { return BodyInserters.fromObject(body); } }
@GetMapping("/exchange") public Mono<Void> exchange(ServerWebExchange webExchange) { ServerHttpResponse response = webExchange.getResponse(); response.setStatusCode(HttpStatus.OK); response.getHeaders().setContentType(MediaType.TEXT_PLAIN); DataBuffer dataBuffer = factory.allocateBuffer().write("Using exchange!".getBytes(StandardCharsets.UTF_8)); return response.writeWith(Mono.just(dataBuffer)); }
@PostMapping public Mono<ResponseEntity<String>> post(@PathVariable String applicationId, ServerHttpRequest request) { return configure(this.webClient.post(), applicationId, request) // .flatMap(spec -> spec .header(CONTENT_TYPE, request.getHeaders().getFirst(CONTENT_TYPE)) // .body(request.getBody(), DataBuffer.class) // .exchange() // .flatMap(res -> res.bodyToMono(String.class) // .map(b -> ResponseEntity.status(res.statusCode()).body(b)) // .switchIfEmpty(emptyResponse(res)))); }
public static String readToString(DataBuffer dataBuffer) { try { return FileCopyUtils.copyToString(new InputStreamReader(dataBuffer.asInputStream())); } catch (IOException e) { return e.getMessage(); } }
@Test public void readMono(){ HttpStatus status = HttpStatus.BAD_REQUEST; ClientHttpResponse httpResponse = request(status, "Exception Mono error message"); RuntimeException exception = new HttpServerException(status); when(errorDecoder.decode(same(status), any(DataBuffer.class))).thenReturn(exception); StepVerifier.create(errorReader.readMono(httpResponse)) .verifyError(HttpServerException.class); verify(errorDecoder).decode(same(status), any(DataBuffer.class)); verifyNoMoreInteractions(errorDecoder); }
@Test public void read(){ HttpStatus status = HttpStatus.BAD_REQUEST; ClientHttpResponse httpResponse = request(status, "Exception Flux error message"); RuntimeException exception = new HttpServerException(status); when(errorDecoder.decode(same(status), any(DataBuffer.class))).thenReturn(exception); StepVerifier.create(errorReader.read(httpResponse)) .verifyError(HttpServerException.class); verify(errorDecoder).decode(same(status), any(DataBuffer.class)); verifyNoMoreInteractions(errorDecoder); }
private ConvertedBodyResponse(ClientResponse response, Function<Flux<DataBuffer>, Flux<DataBuffer>> converter, MediaType contentType) { this.response = response; this.converter = converter; this.headers = new Headers() { @Override public OptionalLong contentLength() { return response.headers().contentLength(); } @Override public Optional<MediaType> contentType() { return Optional.ofNullable(contentType); } @Override public List<String> header(String headerName) { if (headerName.equals(HttpHeaders.CONTENT_TYPE)) { return singletonList(contentType.toString()); } return response.headers().header(headerName); } @Override public HttpHeaders asHttpHeaders() { HttpHeaders newHeaders = new HttpHeaders(); newHeaders.putAll(response.headers().asHttpHeaders()); newHeaders.replace(HttpHeaders.CONTENT_TYPE, singletonList(contentType.toString())); return HttpHeaders.readOnlyHttpHeaders(newHeaders); } }; }
@Override public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) { return response.body((inputMessage, context) -> { ClientHttpResponse convertedMessage = new ClientHttpResponseDecorator(inputMessage) { @Override public Flux<DataBuffer> getBody() { return super.getBody().transform(ConvertedBodyResponse.this.converter); } }; return extractor.extract(convertedMessage, context); }); }
@SuppressWarnings("unchecked") private static <S, T> Function<Flux<DataBuffer>, Flux<DataBuffer>> convertUsing(ParameterizedTypeReference<S> sourceType, ParameterizedTypeReference<T> targetType, Function<S, T> converterFn) { return input -> DECODER.decodeToMono(input, ResolvableType.forType(sourceType), null, null) .map(body -> converterFn.apply((S) body)) .flatMapMany(output -> ENCODER.encode(Mono.just(output), new DefaultDataBufferFactory(), ResolvableType.forType(targetType), null, null)); }
@Test public void should_convert_health() { LegacyEndpointConverter converter = LegacyEndpointConverters.health(); assertThat(converter.canConvert("health")).isTrue(); assertThat(converter.canConvert("foo")).isFalse(); Flux<DataBuffer> legacyInput = this.read("health-legacy.json"); Flux<Object> converted = converter.convert(legacyInput).transform(this::unmarshal); Flux<Object> expected = this.read("health-expected.json").transform(this::unmarshal); StepVerifier.create(Flux.zip(converted, expected)) .assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2())) .verifyComplete(); }
@Test public void should_convert_env() { LegacyEndpointConverter converter = LegacyEndpointConverters.env(); assertThat(converter.canConvert("env")).isTrue(); assertThat(converter.canConvert("foo")).isFalse(); Flux<DataBuffer> legacyInput = this.read("env-legacy.json"); Flux<Object> converted = converter.convert(legacyInput).transform(this::unmarshal); Flux<Object> expected = this.read("env-expected.json").transform(this::unmarshal); StepVerifier.create(Flux.zip(converted, expected)) .assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2())) .verifyComplete(); }
@Test public void should_convert_trace() { LegacyEndpointConverter converter = LegacyEndpointConverters.httptrace(); assertThat(converter.canConvert("httptrace")).isTrue(); assertThat(converter.canConvert("foo")).isFalse(); Flux<DataBuffer> legacyInput = this.read("httptrace-legacy.json"); Flux<Object> converted = converter.convert(legacyInput).transform(this::unmarshal); Flux<Object> expected = this.read("httptrace-expected.json").transform(this::unmarshal); StepVerifier.create(Flux.zip(converted, expected)) .assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2())) .verifyComplete(); }
@Test public void should_convert_threaddump() { LegacyEndpointConverter converter = LegacyEndpointConverters.threaddump(); assertThat(converter.canConvert("threaddump")).isTrue(); assertThat(converter.canConvert("foo")).isFalse(); Flux<DataBuffer> legacyInput = this.read("threaddump-legacy.json"); Flux<Object> converted = converter.convert(legacyInput).transform(this::unmarshal); Flux<Object> expected = this.read("threaddump-expected.json").transform(this::unmarshal); StepVerifier.create(Flux.zip(converted, expected)) .assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2())) .verifyComplete(); }
@Test public void should_convert_liquibase() { LegacyEndpointConverter converter = LegacyEndpointConverters.liquibase(); assertThat(converter.canConvert("liquibase")).isTrue(); assertThat(converter.canConvert("foo")).isFalse(); Flux<DataBuffer> legacyInput = this.read("liquibase-legacy.json"); Flux<Object> converted = converter.convert(legacyInput).transform(this::unmarshal); Flux<Object> expected = this.read("liquibase-expected.json").transform(this::unmarshal); StepVerifier.create(Flux.zip(converted, expected)) .assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2())) .verifyComplete(); }
@Test public void should_convert_flyway() { LegacyEndpointConverter converter = LegacyEndpointConverters.flyway(); assertThat(converter.canConvert("flyway")).isTrue(); assertThat(converter.canConvert("foo")).isFalse(); Flux<DataBuffer> legacyInput = this.read("flyway-legacy.json"); Flux<Object> converted = converter.convert(legacyInput).transform(this::unmarshal); Flux<Object> expected = this.read("flyway-expected.json").transform(this::unmarshal); StepVerifier.create(Flux.zip(converted, expected)) .assertNext(t -> assertThat(t.getT1()).isEqualTo(t.getT2())) .verifyComplete(); }
@Override public HttpClientException decode(HttpStatus httpStatus, DataBuffer inputMessage) { return new HttpClientException(httpStatus, DataBuffers.readToString(inputMessage)); }
@Override public HttpServerException decode(HttpStatus httpStatus, DataBuffer inputMessage) { return new HttpServerException(httpStatus, DataBuffers.readToString(inputMessage)); }
private DataBuffer create(String value) { return new DefaultDataBufferFactory().wrap(value.getBytes()); }
@Override public Flux<DataBuffer> encode(Publisher publisher, DataBufferFactory dataBufferFactory, ResolvableType resolvableType, @Nullable MimeType mimeType, @Nullable Map map) { return null; }
@Override public RuntimeException decode(HttpStatus httpStatus, DataBuffer inputStream) { return null; }
private static Function<ClientResponse, Mono<ClientResponse>> convertClientResponse(Function<Flux<DataBuffer>, Flux<DataBuffer>> bodConverter, MediaType contentType) { return response -> Mono.just(new ConvertedBodyResponse(response, bodConverter, contentType)); }
protected LegacyEndpointConverter(String endpointId, Function<Flux<DataBuffer>, Flux<DataBuffer>> converterFn) { this.endpointId = endpointId; this.converterFn = converterFn; }
public Flux<DataBuffer> convert(Flux<DataBuffer> body) { return converterFn.apply(body); }
@SuppressWarnings("unchecked") private Flux<Object> unmarshal(Flux<DataBuffer> buffer) { return decoder.decode(buffer, type, null, null); }
private Flux<DataBuffer> read(String resourceName) { return DataBufferUtils.readInputStream( () -> LegacyEndpointConvertersTest.class.getResourceAsStream(resourceName), bufferFactory, 10); }
/** * @param httpStatus the status code received by the client * @param inputStream the {@code InputStream} input stream to decode * @return the decoded exception */ T decode(HttpStatus httpStatus, DataBuffer inputStream);