我正在使用celery在Hadoop上运行长时间运行的任务。每个任务都会在Hadoop上执行Pig脚本,该脚本运行大约30分钟-2小时。
我当前的Hadoop设置有4个队列a,b,c和默认队列。当前,所有任务都由一个工人执行,该工人将作业提交到单个队列中。
我想再添加3个将作业提交到其他队列的工作程序,每个队列一个工作程序。
问题是队列当前是硬编码的,我希望为每个工作人员设置此变量。
我进行了很多搜索,但无法找到一种方法来传递每个Celery Workers不同的队列值并在任务中访问它。
我像这样开始我的Celery Workers。
celery -A app.celery worker
我希望在命令行本身中传递一些其他参数,并在我的任务中访问它,但是celery抱怨它不理解我的自定义参数。
我计划通过设置–concurrency=3参数在同一主机上运行所有工作线程。有什么解决办法吗?
concurrency=3
当前场景是这样的。我每次尝试执行任务print_something时都说tasks.print_something.delay()只打印队列C。
print_something
tasks.print_something.delay()
@celery.task() def print_something(): print "C"
我需要让工人根据我在启动时传递给他们的值来打印可变字母。
@celery.task() def print_something(): print "<Variable Value Per Worker Here>"
第一步涉及在celery中添加对自定义参数的支持。如果不这样做,celery将抱怨它不理解该参数。
由于我用Flask运行celery,所以我像这样初始化celery。
def configure_celery(): app.config.update( CELERY_BROKER_URL='amqp://:@localhost:5672', RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>' ) celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery
我调用此函数来初始化celery并将其存储在一个名为celery的变量中。
celery = configure_celery()
要添加自定义参数,你需要执行以下操作。
def add_hadoop_queue_argument_to_worker(parser): parser.add_argument( '--hadoop-queue', help='Hadoop queue to be used by the worker' )
下面使用的celery是我们从上述步骤中获得的celery。
celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)
下一步将是使该参数在工作程序中可访问。为此,请按照下列步骤操作。
class HadoopCustomWorkerStep(bootsteps.StartStopStep): def __init__(self, worker, **kwargs): worker.app.hadoop_queue = kwargs['hadoop_queue']
通知celery使用此类来创建工人。
celery.steps['worker'].add(HadoopCustomWorkerStep)
现在,任务应该能够访问变量了。
@app.task(bind=True) def print_hadoop_queue_from_config(self): print self.app.hadoop_queue
通过在命令行上运行worker进行验证。
celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
我通常要做的是,在另一个脚本(例如manage.py)中启动工作程序(未执行任务)后,我添加带有参数的命令以启动特定任务或具有不同参数的任务。
在manager.py中:
from tasks import some_task @click.command def run_task(params): some_task.apply_async(params)
这将根据需要启动任务。