一尘不染

在终端和Django或Flask的代码模块中使用python multiprocessing pool

django

在Python中使用以下代码在使用multiprocessing.Pool时,会有一些奇怪的行为。

from multiprocessing import Pool
p = Pool(3)
def f(x): return x
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

我收到以下错误三遍(池中的每个线程一个),并且打印出“ 3”到“ 19”:

AttributeError: 'module' object has no attribute 'f'

前三个apply_async调用永不返回。

同时,如果我尝试:

from multiprocessing import Pool
p = Pool(3)
def f(x): print(x)
p.map(f, range(20))

我得到AttributeError 3次,外壳打印“ 6”到“ 19”,然后挂起,无法被[Ctrl] + [C]杀死

多处理文档具有以下说法:

此软件包中的功能要求子模块可以导入主模块。

这是什么意思?

为了澄清起见,我正在终端中运行代码以测试功能,但是最终我希望能够将其放入Web服务器的模块中。您如何在python终端和代码模块中正确使用multiprocessing.Pool?


阅读 1256

收藏
2020-03-31

共1个答案

一尘不染

这意味着必须在要在池上运行的函数定义之后初始化池。if __name__ == "__main__":如果要编写独立脚本,则在块内使用池是可行的,但是在较大的代码库或服务器代码(例如Django或Flask项目)中都无法实现。因此,如果要在其中一种中使用池,请确保遵循以下准则,以下几节对此进行了说明:

在模块底部或内部函数中初始化池。
不要在模块的全局范围内调用Pool的方法。
另外,如果只需要更好的I / O并行性(例如数据库访问或网络调用),则可以避免所有麻烦,并使用线程池而不是进程池。这涉及完全未记录的内容:

from multiprocessing.pool import ThreadPool

它的接口与Pool的接口完全相同,但是由于它使用线程而不是进程,因此它没有使用进程池的警告,唯一的缺点是你没有真正的代码执行并行性,只是阻止I / O的并行性。

必须在要在其上运行的功能定义之后初始化池
python docs中难以理解的文本意味着在定义池时,池中的线程会导入周围的模块。对于python终端,这意味着到目前为止已经运行过的所有代码。

因此,必须在初始化池之前定义要在池中使用的所有功能。模块中的代码和终端中的代码都是如此。对问题中的代码进行以下修改将可以正常工作:

from multiprocessing import Pool
def f(x): return x  # FIRST
p = Pool(3) # SECOND
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

要么

from multiprocessing import Pool
def f(x): print(x)  # FIRST
p = Pool(3) # SECOND
p.map(f, range(20))

很好,我的意思是在Unix上很好。Windows有其自身的问题,我不在这里讨论。

注意在模块中使用池
但是,等等,还有更多(要在要导入其他位置的模块中使用池)!

如果在函数内部定义池,则不会有问题。 但是,如果要将Pool对象用作模块中的全局变量,则必须在页面底部而不是top中定义它。尽管这与大多数良好的代码风格背道而驰,但是功能性还是必需的。使用在页面顶部声明的池的方法是仅将其与从其他模块导入的函数一起使用,如下所示:

from multiprocessing import Pool
from other_module import f
p = Pool(3)
p.map(f, range(20))

从另一个模块导入预先配置的池非常恐怖,因为导入必须在你要在其上运行的任何内容之后进行,例如:

### module.py ###
from multiprocessing import Pool
POOL = Pool(5)

### module2.py ###
def f(x):
    # Some function
from module import POOL
POOL.map(f, range(10))

其次,如果你在要导入的模块的全局范围内的池中运行任何内容,则系统将挂起。即这不起作用:

### module.py ###
from multiprocessing import Pool
def f(x): return x
p = Pool(1)
print(p.map(f, range(5)))

### module2.py ###
import module

然而,这确实工作,只要没有进口模块2:

### module.py ###
from multiprocessing import Pool

def f(x): return x
p = Pool(1)
def run_pool(): print(p.map(f, range(5)))

### module2.py ###
import module
module.run_pool()

现在,其背后的原因更加奇怪,并且可能与问题中的代码每次仅抛出一次属性错误以及之后似乎正确执行代码的原因有关。似乎池线程(至少具有某种可靠性)在执行后会重新加载模块中的代码。

2020-03-31