一尘不染

“关闭”阻塞队列

java

我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

到目前为止,一切都很好。在阻塞队列的javadoc中,我读到:

BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的流尾对象或有毒对象,这些特殊对象在被消费者拿走时会被相应地解释。

不幸的是,由于使用了泛型以及ComplexObject的性质,将“毒物”推入队列并不是一件容易的事。因此,在我的情况下,这种“通用策略”并不是很方便。

我的问题是:我还可以使用哪些其他好的策略/模式来“关闭”队列?

谢谢!


阅读 233

收藏
2020-12-03

共1个答案

一尘不染

如果您拥有使用者线程的句柄,则可以中断它。使用您提供的代码,将杀死消费者。我不希望制片人有这个。它可能必须以某种方式回调到程序控制器,以使其知道已完成。然后,控制器将中断使用者线程。

您总是可以在服从中断之前完成工作。例如:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

我实际上更愿意以某种方式使队列中毒。如果要杀死线程,请让线程杀死自己。

(很高兴看到有人处理InterruptedException正确。)


这里似乎对中断的处理有些争议。首先,我希望每个人都阅读这篇文章:http
:
//www.ibm.com/developerworks/java/library/j-jtp05236.html

现在,在了解到没人真正读懂这一点的情况下,这就是交易。如果线程InterruptedException在中断时当前正在阻塞,则该线程仅会接收到。在这种情况下,Thread.interrupted()将返回false。如果没有阻塞,它将不会收到此异常,而是Thread.interrupted()将返回true。因此,无论如何,循环防护绝对应检查Thread.interrupted()或以其他方式冒失线程中断的风险。

因此,由于Thread.interrupted()无论如何都在进行检查,并且被迫捕获InterruptedException(即使没有被强制处理,也应该对其进行处理),因此现在有两个处理同一事件的代码区域,即线程中断。解决此问题的一种方法是将它们规范化为一种条件,这意味着布尔状态检查可以引发异常,或者异常可以设置布尔状态。我选择稍后。


编辑: 请注意,静态Thread#interrupted方法将 清除 当前线程的中断状态。

2020-12-03