(这个问题是关于如何使multiprocessing.Pool()运行代码更快。我终于解决了它,最终的解决方案可以在文章的底部找到。)
原始问题:
我正在尝试使用Python将一个单词与列表中的其他单词进行比较,并检索最相似的列表。为此,我使用了difflib.get_close_matches函数。我正在使用Python 2.6.5的相对较新且功能强大的Windows 7便携式计算机。
我想要的是加快比较过程,因为我的单词比较列表很长,而且我不得不重复几次比较过程。当我听到多处理模块的消息时,如果将比较分解成多个工作任务并同时运行(从而利用机器动力来换取更快的速度),我的比较任务将更快地完成,这似乎是合乎逻辑的。
但是,即使尝试了许多不同的方法,并使用了文档中显示并在论坛帖子中建议的方法,Pool方法似乎仍然非常慢,比仅在整个列表中运行原始get_close_matches函数要慢得多。一旦。我想帮助您了解为什么Pool()这么慢以及我是否正确使用它。我仅以该字符串比较方案为例,因为这是我可以想到的最近的示例,在该示例中我无法理解或无法使多处理工作而不是不利于我。以下是difflib场景中的示例代码,显示了普通方法和Pooled方法之间的时间差:
from multiprocessing import Pool import random, time, difflib # constants wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)] mainword = "hello" # comparison function def findclosematch(subwordlist): matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7) if matches <> []: return matches # pool print "pool method" if __name__ == '__main__': pool = Pool(processes=3) t=time.time() result = pool.map_async(findclosematch, wordlist, chunksize=100) #do something with result for r in result.get(): pass print time.time()-t # normal print "normal method" t=time.time() # run function result = findclosematch(wordlist) # do something with results for r in result: pass print time.time()-t
要找到的单词是“ hello”,要查找紧密匹配的单词的列表是一百万个长列表,其中包含5个随机连接的字符(仅用于说明目的)。我使用3个处理器内核和一个mapmap函数,其chunksize为100(我认为每个工人要处理的项目?)(我也尝试了1000和10000的chunksize,但没有真正的区别)。请注意,在这两种方法中,我都在调用函数之前立即启动计时器,并在遍历结果之后立即终止计时器。正如您在下面看到的那样,计时结果显然支持原始的非Pool方法:
>>> pool method 37.1690001488 seconds normal method 10.5329999924 seconds >>>
The Pool method is almost 4 times slower than the original method. Is there something I am missing here, or maybe misunderstanding about how the Pooling/multiprocessing works? I do suspect that part of the problem here could be that the map function returns None and so adds thousands of unneccessary items to the resultslist even though I only want actual matches to be returned to the results and have written it as such in the function. From what I understand that is just how map works. I have heard about some other functions like filter that only collects non-False results, but I dont think that multiprocessing/Pool supports the filter method. Are there any other functions besides map/imap in the multiprocessing module that could help me out in only returning what my function returns? Apply function is more for giving multiple arguments as I understand it.
我知道还有我尝试过的imap函数,但没有任何时间改进。原因与我在了解itertools模块的所谓“快如闪电”功能时遇到的问题的原因相同,我注意到这对于调用该函数是正确的,但是根据我的经验以及我所读到的内容,因为调用该函数实际上并不会执行任何计算,所以当需要遍历结果以进行收集和分析时(如果没有这些结果,则没有必要调用命令),它花费的时间会比计算时间长或有时长只是使用正常的功能组合。但我想这是另一篇文章。
无论如何,很高兴看到有人可以在这里向正确的方向推动我,并非常感谢在此方面的任何帮助。我对总体理解多处理感兴趣,而不是使该示例正常工作,尽管它与一些示例解决方案代码建议有助于我的理解很有用。
答案:
似乎减速与其他进程的启动时间慢有关。我无法让.Pool()函数足够快。我要使其更快的最终解决方案是手动拆分工作负载列表,使用多个.Process()而不是.Pool(),然后在队列中返回解决方案。但我想知道,最关键的变化是否可能是根据要查找的主词而不是要比较的词来划分工作量,也许是因为difflib搜索功能已经如此之快。这是同时运行5个进程的新代码,与运行简单代码(6秒vs 55秒)相比,新代码的运行速度提高了大约10倍。除了difflib已经有多快之外,它对于快速模糊查找非常有用。
from multiprocessing import Process, Queue import difflib, random, time def f2(wordlist, mainwordlist, q): for mainword in mainwordlist: matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7) q.put(matches) if __name__ == '__main__': # constants (for 50 input words, find closest match in list of 100 000 comparison words) q = Queue() wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)] mainword = "hello" mainwordlist = [mainword for each in xrange(50)] # normal approach t = time.time() for mainword in mainwordlist: matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7) q.put(matches) print time.time()-t # split work into 5 or 10 processes processes = 5 def splitlist(inlist, chunksize): return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)] print len(mainwordlist)/processes mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes) print "list ready" t = time.time() for submainwordlist in mainwordlistsplitted: print "sub" p = Process(target=f2, args=(wordlist,submainwordlist,q,)) p.Daemon = True p.start() for submainwordlist in mainwordlistsplitted: p.join() print time.time()-t while True: print q.get()
我最好的猜测是进程间通信(IPC)开销。在单进程实例中,单进程具有单词列表。当委派给其他各种流程时,主要流程需要不断地将列表的各个部分穿梭到其他流程中。
因此,随之而来的是,更好的方法可能是剥离 n个 进程,每个进程负责加载/生成列表的 1 / n 段并检查单词是否在列表的该部分中。
不过,我不确定如何使用Python的多处理库来做到这一点。