一尘不染

等到未来 已经完成了

java

我运行的异步任务很少,我需要等待至少其中之一完成(将来,我可能需要等待N个任务中的util M完成)。目前,它们以“未来”的形式呈现,所以我需要类似

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

像这样吗
或类似的东西,对于Future来说不是必需的。目前,我循环浏览期货,检查一项是否完成,然后入睡一段时间,然后再次检查。这似乎不是最佳解决方案,因为如果我长时间睡眠,则会增加不必要的延迟;如果我短期睡眠,则可能会影响性能。

我可以尝试使用

new CountDownLatch(1)

并在任务完成时减少倒计时

countdown.await()

,但我发现只有在我控制Future创建时才有可能。这是可能的,但需要重新设计系统,因为当前任务创建的逻辑(将Callable发送给ExecutorService)与决定等待哪个Future分开。我也可以覆盖

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

并创建RunnableFuture的自定义实现,并能够将任务完成时通知的侦听器附加到侦听器,然后将侦听器附加到所需的任务并使用CountDownLatch,但这意味着我必须为我使用的每个ExecutorService覆盖newTaskFor-
可能会有实现不会扩展AbstractExecutorService。我也可以尝试出于相同的目的包装给定的ExecutorService,但是随后我必须装饰产生Futures的所有方法。

所有这些解决方案可能都有效,但看起来很不自然。好像我缺少一些简单的东西,例如

WaitHandle.WaitAny(WaitHandle[] waitHandles)

在C#中。是否有针对此类问题的知名解决方案?

更新:

最初,我根本无法使用Future创作,因此没有优雅的解决方案。重新设计系统后,我可以访问Future创建并能够将countDownLatch.countdown()添加到执行过程中,然后我可以countDownLatch.await()正常运行。感谢您提供其他答案,我对ExecutorCompletionService并不了解,它确实可以对类似的任务有所帮助,但是在这种特殊情况下,由于某些Futures是在没有任何执行者的情况下创建的,因此无法使用它-
实际任务通过网络发送到另一台服务器,远程完成并收到完成通知。


阅读 192

收藏
2020-12-03

共1个答案

一尘不染

据我所知,Java没有与该WaitHandle.WaitAny方法类似的结构。

在我看来,这可以通过“ WaitableFuture”装饰器来实现:

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

尽管只有将其插入执行代码之前它才有效,否则执行代码将不会具有new
doTask()方法。但是,如果您不能以某种方式在执行之前获得对Future对象的控制,我真的看不到没有轮询就无法执行此操作的方法。

或者,如果将来总是在自己的线程中运行,那么您可以通过某种方式获得该线程。然后,您可以产生一个新线程来互相连接线程,然后在连接返回后处理等待机制……这确实很丑陋,但是会引起很多开销。而且,如果某些Future对象没有完成,则根据死线程,您可能会有很多阻塞线程。如果不小心,可能会泄漏内存和系统资源。

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}
2020-12-03