一尘不染

如何将python asyncio与线程结合在一起?

python

我已经使用Python
asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。

然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。

现在,我想定期检查该结构到磁盘的位置,最好使用pickle。

由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上占用传入事件处理时间。

我宁愿为该结构创建快照副本(例如,Deepcopy),然后花点时间将其写入磁盘并按预设的时间间隔重复执行。

我一直在寻找有关如何组合线程的示例(并且线程是否是为此的最佳解决方案?)和异步用于此目的,但是找不到对我有用的东西。

非常感谢任何入门的指点!


阅读 176

收藏
2020-12-20

共1个答案

一尘不染

使用以下方法将方法委派给线程或子流程非常简单BaseEventLoop.run_in_executor

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

至于使用aProcessPoolExecutor还是ThreadPoolExecutor,这很难说。腌制一个大物体肯定会消耗一些CPU周期,最初您会认为这ProcessPoolExecutor是可行的方法。但是,将100MB对象传递到Process池中的a将需要在主进程中腌制该实例,通过IPC将字节发送到子进程,在子进程中将其取消腌制,然后
再次
进行腌制,以便可以将其写入磁盘。鉴于此,我的猜测是,酸洗/去酸洗的开销将足够大ThreadPoolExecutor,即使使用GIL会对性能造成负面影响,您也最好使用。

也就是说,两种方法的测试都非常简单,并且可以确定找出来,所以您也可以这样做。

2020-12-20