一尘不染

asyncio:是否可以取消由执行者执行的未来?

python

我想使用asyncio调用loop.run_in_executor在Executor中启动一个阻塞函数,然后在以后取消它,但这似乎对我不起作用。

这是代码:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我希望上面的代码仅允许阻塞函数输出:

blocking 0/5
blocking 1/5

然后查看非阻塞函数的输出。但是,即使我取消了,阻碍性的未来仍在继续。

可能吗?还有其他方法吗?

谢谢


阅读 132

收藏
2020-12-20

共1个答案

一尘不染

在这种情况下,Future一旦它真正开始运行,就无法取消它,因为您依赖的行为concurrent.futures.Future,并且它的文档指出以下内容

cancel()

尝试取消呼叫。 如果该调用当前正在执行并且无法取消,则该方法将返回False,否则,该调用将被取消并且该方法将返回True

因此,唯一成功的取消操作是如果任务仍在中等待执行Executor。现在,您实际上正在使用asyncio.Future包裹concurrent.futures.Future,并且在实践中,即使基础任务实际上已经在运行,如果您在调用后尝试asyncio.Future返回,byloop.run_in_executor()也会引发a
。但是,它 实际上 并不会取消中的任务执行。CancellationError``yield from``cancel() __Executor

如果需要实际取消任务,则需要使用更常规的方法来中断线程中正在运行的任务。具体操作方式取决于用例。对于示例中显示的用例,可以使用threading.Event

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
2020-12-20