我想使用asyncio调用loop.run_in_executor在Executor中启动一个阻塞函数,然后在以后取消它,但这似乎对我不起作用。
这是代码:
import asyncio import time from concurrent.futures import ThreadPoolExecutor def blocking_func(seconds_to_block): for i in range(seconds_to_block): print('blocking {}/{}'.format(i, seconds_to_block)) time.sleep(1) print('done blocking {}'.format(seconds_to_block)) @asyncio.coroutine def non_blocking_func(seconds): for i in range(seconds): print('yielding {}/{}'.format(i, seconds)) yield from asyncio.sleep(1) print('done non blocking {}'.format(seconds)) @asyncio.coroutine def main(): non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)] blocking_future = loop.run_in_executor(None, blocking_func, 5) print('wait a few seconds!') yield from asyncio.sleep(1.5) blocking_future.cancel() yield from asyncio.wait(non_blocking_futures) loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) loop.set_default_executor(executor) asyncio.async(main()) loop.run_forever()
我希望上面的代码仅允许阻塞函数输出:
blocking 0/5 blocking 1/5
然后查看非阻塞函数的输出。但是,即使我取消了,阻碍性的未来仍在继续。
可能吗?还有其他方法吗?
谢谢
在这种情况下,Future一旦它真正开始运行,就无法取消它,因为您依赖的行为concurrent.futures.Future,并且它的文档指出以下内容:
Future
concurrent.futures.Future
cancel() 尝试取消呼叫。 如果该调用当前正在执行并且无法取消,则该方法将返回False,否则,该调用将被取消并且该方法将返回True。
cancel()
尝试取消呼叫。 如果该调用当前正在执行并且无法取消,则该方法将返回False,否则,该调用将被取消并且该方法将返回True。
False
True
因此,唯一成功的取消操作是如果任务仍在中等待执行Executor。现在,您实际上正在使用asyncio.Future包裹concurrent.futures.Future,并且在实践中,即使基础任务实际上已经在运行,如果您在调用后尝试asyncio.Future返回,byloop.run_in_executor()也会引发a 。但是,它 实际上 并不会取消中的任务执行。CancellationError``yield from``cancel() __Executor
Executor
asyncio.Future
loop.run_in_executor()
CancellationError``yield from``cancel()
如果需要实际取消任务,则需要使用更常规的方法来中断线程中正在运行的任务。具体操作方式取决于用例。对于示例中显示的用例,可以使用threading.Event:
threading.Event
def blocking_func(seconds_to_block, event): for i in range(seconds_to_block): if event.is_set(): return print('blocking {}/{}'.format(i, seconds_to_block)) time.sleep(1) print('done blocking {}'.format(seconds_to_block)) ... event = threading.Event() blocking_future = loop.run_in_executor(None, blocking_func, 5, event) print('wait a few seconds!') yield from asyncio.sleep(1.5) blocking_future.cancel() # Mark Future as cancelled event.set() # Actually interrupt blocking_func