一尘不染

在Java中使用wait()和notify()的简单场景

java

我可以得到一个完整的简单方案,即建议如何使用它的教程,特别是在队列中吗?


阅读 1704

收藏
2020-03-06

共1个答案

一尘不染

wait()notify()方法被设计为提供一种机制,以允许一个线程块,直到一个特定的条件被满足。为此,我假设你要编写一个阻塞队列实现,其中具有一些固定大小的元素后备存储。

你要做的第一件事是确定你希望方法等待的条件。在这种情况下,你将希望该put()方法阻塞直到存储空间可用,并且你将希望该take()方法阻塞直到返回某些元素。

public class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException {
        while(queue.size() == capacity) {
            wait();
        }

        queue.add(element);
        notify(); // notifyAll() for multiple producer/consumer threads
    }

    public synchronized T take() throws InterruptedException {
        while(queue.isEmpty()) {
            wait();
        }

        T item = queue.remove();
        notify(); // notifyAll() for multiple producer/consumer threads
        return item;
    }
}

关于必须使用等待和通知机制的方式,需要注意一些事项。

首先,你需要确保对代码的任何调用wait()notify()在代码的同步区域内(并且wait()notify()调用在同一对象上同步)。造成这种情况的原因(除了标准线程安全问题之外)是由于某种原因导致的信号丢失。

这样的一个示例是,put()当队列碰巧已满时,线程可能会调用,然后它检查条件,发现队列已满,但是在它可以阻止另一个线程调度之前。然后,第二个线程take()是队列中的一个元素,并通知等待线程该队列不再满。但是,由于第一个线程已经检查了条件,因此wait()即使可以进行进度,它也将在重新调度后简单地进行调用。

通过在共享库上同步,可以确保不会发生此问题,因为在第take()一个线程实际被阻塞之前,第二个线程的调用将无法进行。

其次,由于称为虚假唤醒的问题,你需要将要检查的条件放入while循环中,而不是if语句中。在这里有时可以在不notify()调用等待线程的情况下重新激活它。将此检查置于while循环中将确保如果发生虚假唤醒,将重新检查条件,并且线程将wait()再次调用。

就像其他答案中提到的那样,Java 1.5引入了一个新的并发库(在java.util.concurrent包中),该库旨在在等待/通知机制上提供更高级别的抽象。使用这些新功能,你可以像这样重写原始示例:

public class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public void put(T element) throws InterruptedException {
        lock.lock();
        try {
            while(queue.size() == capacity) {
                notFull.await();
            }

            queue.add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while(queue.isEmpty()) {
                notEmpty.await();
            }

            T item = queue.remove();
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

当然,如果你实际上需要阻塞队列,则应该使用BlockingQueue接口的实现 。

另外,对于这种事情,我强烈建议在实践中使用Java Concurrency,因为它涵盖了你可能希望了解的与并发相关的问题和解决方案的所有内容。

2020-03-06