一尘不染

具有有限队列的线程池

java

我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求-
我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。

public interface ISaturatedPoolObserver {
 void onSaturated(); // called when the blocking queue reaches the size limit
 void onUnsaturated(); // called when blocking queues size goes below the threshold.
}

我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。


阅读 272

收藏
2020-12-03

共1个答案

一尘不染

我希望有一个回调机制,当达到队列大小限制时,我会在其中收到通知…

我不会将执行器子类化,但会将执行BlockingQueue器使用的子类化。像下面这样的东西应该起作用。checkUnsaturated()如果删除条目并将某人放回原处,则代码中存在竞争条件。如果这些条件需要完善,则可能必须在队列上进行同步。另外,我也不知道执行器实现使用什么方法,因此您可能不需要覆盖其中的一些方法。

public class ObservableBlockingQueue<E> extends LinkedBlockingQueue<E> {
     private ISaturatedPoolObserver observer;
     private int capacity;
     public ObservableBlockingQueue(ISaturatedPoolObserver observer,
         int capacity) {
         super(capacity);
         this.observer = observer;
         this.capacity = capacity;
    }
    @Override
    public boolean offer(E o) {
        boolean offered = super.offer(o);
        if (!offered) {
            observer.onSaturated();
        }
        return offered;
    }
    @Override
    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
        boolean offered = super.offer(o, timeout, unit);
        if (!offered) {
            observer.onSaturated();
        }
        return offered;
    }
    @Override
    public E poll() {
        E e = super.poll();
        if (e != null) {
             checkUnsaturated();
        }
        return e;
    }
    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = super.poll(timeout, unit);
        if (e != null) {
             checkUnsaturated();
        }
        return e;
    }
    @Override
    public E take() throws InterruptedException {
        E e = super.take();
        checkUnsaturated();
        return e;
    }
    @Override
    public boolean remove(E e) throws InterruptedException {
        boolean removed = super.remove(e);
        if (removed) {
            checkUnsaturated();
        }
        return removed;
    }
    private void checkUnsaturated() {
        if (super.size() * 100 / capacity < UNSATURATED_PERCENTAGE) {
            observer.onUnsaturated();
        }
    }
}
2020-12-03