我正在使用我的应用程序中的单个传输客户端实例在Elasticsearch中查询多个并行请求。
对于并行执行,我得到了以下异常。如何克服这个问题。
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@5f804c60 at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:509) at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteScan(SearchServiceTransportAction.java:441) at org.elasticsearch.action.search.type.TransportSearchScanAction$AsyncAction.sendExecuteFirstPhase(TransportSearchScanAction.java:68) at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:171) at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:153) at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:52) at org.elasticsearch.action.search.type.TransportSearchScanAction.doExecute(TransportSearchScanAction.java:42) at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63) at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:107) at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43) at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:63) at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:124) at org.elasticsearch.action.search.TransportSearchAction$TransportHandler.messageReceived(TransportSearchAction.java:113) at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:212) at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:109) at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Elasticsearch有一个线程池和一个用于每个节点搜索的队列。线程池将具有N个准备就绪的工作者来处理请求。当请求到来且工作人员空闲时,由工作人员处理。现在默认情况下,工作程序数量等于该CPU上的内核数量。当工作人员忙碌并且有更多搜索请求时,该请求将进入队列。队列的大小也受到限制。如果默认大小为100,并且如果发生的并行请求多于此,那么这些请求将被拒绝,正如您在错误日志中看到的那样。
解决方案:
对此的直接解决方案是增加搜索队列的大小。我们还可以增加线程池的大小,但这可能会严重影响单个查询的性能。因此,增加队列可能是一个好主意。但是请记住,此队列位于内存中,过多增加队列大小可能会导致内存不足问题。(更多信息)
增加节点和副本的数量-请记住,每个节点都有自己的搜索线程池/队列。此外,搜索可以在主分片或副本上进行。