我正在尝试使用Webflux将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则api返回成功,但是DTO在生成文件而不是文件本身时会详细说明错误。这使用的是非常古老且设计不佳的api,因此请原谅post和api设计的使用。
api调用(exchange())的响应是ClientResponse。从这里,我可以使用bodyToMono转换为ByteArrayResource并将其传输到文件中,或者,如果在创建文件时出错,那么我也可以使用bodyToMono转换为DTO。但是,我似乎既不执行任何操作,也不依赖于ClientResponse标头的内容。
在运行时,我收到了一个IllegalStateException引起的
block()/ blockFirst()/ blockLast()正在阻塞,线程反应器-http-client-epoll-12不支持
我认为我的问题是我无法在同一功能链中两次调用block()。
我的代码段如下所示:
webClient.post() .uri(uriBuilder -> uriBuilder.path("/file/") .queryParams(params).build()) .exchange() .doOnSuccess(cr -> { if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) { NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block(); createErrorFile(dto); } else { ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block(); createSpreadsheet(bAr); } } ) .block();
基本上,我想根据标头中定义的MediaType来不同地处理ClientResponse。
这可能吗?
首先,一些事情将帮助您理解解决此用例的代码片段。
subscribe
DataBuffer
void
这是一个可用于执行此操作的代码段:
Mono<Void> fileWritten = WebClient.create().post() .uri(uriBuilder -> uriBuilder.path("/file/").build()) .exchange() .flatMap(response -> { if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) { Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class); return createErrorFile(dto); } else { Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class); return createSpreadsheet(body); } }); // Once you get that Mono, you should give plug it into an existing // reactive pipeline, or call block on it, depending on the situation
如您所见,我们不会在任何地方阻塞,并且处理I / O的方法将返回Mono<Void>,这与done(error)回调的反应性等效,该回调表示何时完成操作以及是否发生错误。
Mono<Void>
done(error)
由于不确定该createErrorFile方法应该做什么,因此提供了一个示例,createSpreadsheet该示例仅将主体字节写入文件中。请注意,由于数据缓冲区可能被回收/池化,因此我们需要在完成后释放它们。
createErrorFile
createSpreadsheet
private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) { try { Path file = //... WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE); return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then(); } catch (IOException exc) { return Mono.error(exc); } }
通过此实现,您的应用程序将DataBuffer在给定的时间在内存中保存几个实例(反应式运算符出于性能原因正在预取值),并将以反应式方式写入字节。