我是multiprocessing模块的新手。我只是尝试创建以下内容:我有一个工作是从RabbitMQ获取消息并将其传递到内部队列(multiprocessing.Queue)。然后,我想做的是:在收到新消息时生成一个进程。它可以工作,但是在完成工作后,它留下了一个僵尸进程,不会被其父进程终止。这是我的代码:
multiprocessing
multiprocessing.Queue
主要过程:
#!/usr/bin/env python import multiprocessing import logging import consumer import producer import worker import time import base conf = base.get_settings() logger = base.logger(identity='launcher') request_order_q = multiprocessing.Queue() result_order_q = multiprocessing.Queue() request_status_q = multiprocessing.Queue() result_status_q = multiprocessing.Queue() CONSUMER_KEYS = [{'queue':'product.order', 'routing_key':'product.order', 'internal_q':request_order_q}] # {'queue':'product.status', # 'routing_key':'product.status', # 'internal_q':request_status_q}] def main(): # Launch consumers for key in CONSUMER_KEYS: cons = consumer.RabbitConsumer(rabbit_q=key['queue'], routing_key=key['routing_key'], internal_q=key['internal_q']) cons.start() # Check reques_order_q if not empty spaw a process and process message while True: time.sleep(0.5) if not request_order_q.empty(): handler = worker.Worker(request_order_q.get()) logger.info('Launching Worker') handler.start() if __name__ == "__main__": main()
这是我的工人:
import multiprocessing import sys import time import base conf = base.get_settings() logger = base.logger(identity='worker') class Worker(multiprocessing.Process): def __init__(self, msg): super(Worker, self).__init__() self.msg = msg self.daemon = True def run(self): logger.info('%s' % self.msg) time.sleep(10) sys.exit(1)
因此,在处理完所有消息之后,我可以看到带有ps aux命令的进程。但是我真的希望它们一旦完成就可以终止。谢谢。
ps aux
有两件事:
确保父母joins的孩子,避免僵尸。请参阅Python多处理终止进程
joins
您可以使用is_alive()成员函数检查孩子是否仍在运行。请参阅http://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process
is_alive()