一尘不染

具有有限CPU /端口的Python多线程处理

python

我有一个要 并行 处理的文件夹名称字典。在每个文件夹,里面是文件名的数组,我想在加工 系列

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

最终,我将创建一个名为folder_name的文件夹,其中包含名称为的文件len(folder_file_dict[folder_name][file_names_key])。我有这样的方法:

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

请注意time_consuming_method()上面的内容,由于通过UDP端口进行的调用会花费很长时间。我也仅限于在上面的阵列中使用UDP端口。因此,我必须等待time_consuming_methodUDP端口完成操作,然后才能再次使用该UDP端口。这意味着我一次只能len(udp_ports)运行线程。

因此,我最终将len(folder_file_dict.keys())通过len(folder_file_dict.keys())调用来创建线程process_files_in_series。我也有MAX_THREAD个计数。我正在尝试使用QueueThreading模块,但是我不确定我需要哪种设计。如何使用队列和线程以及可能的条件来做到这一点?使用线程池的解决方案也可能会有所帮助。

注意

我没有试图提高读取/写入速度。我正在尝试并行调用time_consuming_methodunder
process_files_in_series。创建这些文件只是过程的一部分,而不是速率限制步骤。

另外,我要寻找一个解决方案,使用QueueThreading以及可能的Condition模块或相关于这些模块什么。线程池解决方案也可能会有所帮助。我不能使用进程,只能使用线程。

我也在寻找Python 2.7中的解决方案。


阅读 319

收藏
2021-01-20

共1个答案

一尘不染

使用线程池:

#!/usr/bin/env python2
from multiprocessing.dummy import Pool, Queue # thread pool

folder_file_dict = {
    folder_name: {
        file_names_key: file_names_array
    }
}

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"
         ...

def mp_process(filenames):
    udp_port = free_udp_ports.get() # block until a free udp port is available
    args = filenames, udp_port
    try:
        return args, process_files_in_series(*args), None
    except Exception as e:
        return args, None, str(e)
    finally:
        free_udp_ports.put_nowait(udp_port)

free_udp_ports = Queue() # in general, use initializer to pass it to children
for port in udp_ports:
    free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #
for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
    if error is not None:
       print args, error

我认为如果不同文件名数组的处理时间可能不同,则不需要将线程数绑定到udp端口数。

如果我folder_file_dict正确理解了结构,则生成文件名数组:

def get_files_arrays(folder_file_dict=folder_file_dict):
    for folder_name_dict in folder_file_dict.itervalues():
        for filenames_array in folder_name_dict.itervalues():
            yield filenames_array
2021-01-20