最初,我有一个类来存储一些处理后的值,并将其与其他方法重用。
问题是,当我尝试将类方法划分为多个进程以加快速度时,python生成了进程,但它似乎不起作用(正如我在“任务管理器”中看到的那样,只有1个进程在运行)并且结果从未交付。
我进行了几次搜索,发现pathos.multiprocessing可以代替它,但是我想知道标准库是否可以解决这个问题?
from multiprocessing import Pool class A(): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = Pool(processes=4) rs = t.map(self.cal, dt) t.close() return t a = A(2) a.run(list(range(10)))
您的代码失败了,因为它无法pickle执行实例方法(self.cal),这是Python试图通过将它们映射到来生成多个进程时尝试做的事情multiprocessing.Pool(嗯,有一种方法可以做到,但是这种方法太复杂了,没有太大用处)无论如何)-由于没有共享内存访问权限,因此必须对数据进行“打包”并将其发送到生成的进程进行解压缩。如果您尝试使a实例腌制,也会发生同样的情况。
pickle
self.cal
multiprocessing.Pool
a
该multiprocessing软件包中唯一可用的共享内存访问权限鲜为人知,multiprocessing.pool.ThreadPool因此,如果您确实要执行此操作,请执行以下操作:
multiprocessing
multiprocessing.pool.ThreadPool
from multiprocessing.pool import ThreadPool class A(): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = ThreadPool(processes=4) rs = t.map(self.cal, dt) t.close() return rs a = A(2) print(a.run(list(range(10)))) # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
但这并不能为您提供并行化,因为它实际上映射到确实可以访问共享内存的常规线程。您应该传递类/静态方法(如果需要调用它们),并传递希望它们使用的数据(在您的情况下self.vl)。如果您需要跨进程共享数据,则必须使用一些共享内存抽象,例如multiprocessing.Value,当然要使用互斥锁。
self.vl
multiprocessing.Value
更新
我说您可以做到(例如,有一些模块或多或少都在这样做pathos.multiprocessing),但我认为这样做不值得- 当您不得不欺骗系统去做某事时您可能会使用错误的系统,或者应该重新考虑设计。但是,为了了解情况,以下是在多处理设置中执行所需操作的一种方法:
pathos.multiprocessing
import sys from multiprocessing import Pool def parallel_call(params): # a helper for calling 'remote' instances cls = getattr(sys.modules[__name__], params[0]) # get our class type instance = cls.__new__(cls) # create a new instance without invoking __init__ instance.__dict__ = params[1] # apply the passed state to the new instance method = getattr(instance, params[2]) # get the requested method args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]] return method(*args) # expand arguments, call our method and return the result class A(object): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = Pool(processes=4) rs = t.map(parallel_call, self.prepare_call("cal", dt)) t.close() return rs def prepare_call(self, name, args): # creates a 'remote call' package for each argument for arg in args: yield [self.__class__.__name__, self.__dict__, name, arg] if __name__ == "__main__": # important protection for cross-platform use a = A(2) print(a.run(list(range(10)))) # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
我认为这是很容易解释的,但是总之它传递了类的名称,其当前状态(无信号,tho),所需的要调用的方法以及用于调用该parallel_call函数的参数。中的每个过程Pool。Python自动对所有这些数据进行酸洗和剔除,因此parallel_call所需要做的就是重建原始对象,在其中找到所需的方法,然后使用提供的参数对其进行调用。
parallel_call
Pool
这样,我们只传递数据而不尝试传递活动对象,因此Python不会抱怨(嗯,在这种情况下,请尝试在类参数中添加对实例方法的引用,看看会发生什么),并且一切正常。
如果您想沉迷于“魔术”,则可以使其看起来完全像您的代码(创建自己的Pool处理程序,从函数中选取名称并将名称发送到实际进程等),但这应该可以提供足够的功能举个例子
但是,在提高希望之前,请记住,这仅在共享“静态”实例(一旦在多处理上下文中开始调用它的初始状态就不会更改其初始状态)时起作用。如果该A.cal方法要更改vl属性的内部状态,则它将仅影响更改属性的实例(除非在调用两次调用Pool之间的主实例中更改)。如果还想共享状态,则可以在调用后升级parallel_call以instance.__dict__接听,并将其与方法调用结果一起返回,然后在调用方必须更新本地__dict__与返回的数据一起更改原始状态。但这还不够- 您实际上必须创建一个共享的dict并处理所有互斥体,才能使所有进程同时访问它(您可以使用multiprocessing.Manager它)。
A.cal
vl
instance.__dict__
__dict__
multiprocessing.Manager
所以,正如我所说的,麻烦多于其价值。