我正在使用python Flask建立一个网站。一切进展顺利,现在我正在尝试实施celery。
在我尝试使用celery的flask-mail发送电子邮件之前,这种情况也很好。现在,我收到“在应用程序上下文之外工作”错误。
完整的追溯是
Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 228, in trace_task R = retval = fun(*args, **kwargs) File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 415, in __protected_call__ return self.run(*args, **kwargs) File "/home/ryan/www/CG-Website/src/util/mail.py", line 28, in send_forgot_email msg = Message("Recover your Crusade Gaming Account") File "/usr/lib/python2.7/site-packages/flask_mail.py", line 178, in __init__ sender = current_app.config.get("DEFAULT_MAIL_SENDER") File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 336, in __getattr__ return getattr(self._get_current_object(), name) File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 295, in _get_current_object return self.__local() File "/usr/lib/python2.7/site-packages/flask/globals.py", line 26, in _find_app raise RuntimeError('working outside of application context') RuntimeError: working outside of application context
这是我的邮件功能:
@celery.task def send_forgot_email(email, ref): global mail msg = Message("Recover your Crusade Gaming Account") msg.recipients = [email] msg.sender = "Crusade Gaming stuff@cg.com" msg.html = \ """ Hello Person,<br/> You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br /> If you did not request that your password be reset, please ignore this. """.format(url_for('account.forgot', ref=ref, _external=True)) mail.send(msg)
这是我的celery文件:
from __future__ import absolute_import from celery import Celery celery = Celery('src.tasks', broker='amqp://', include=['src.util.mail']) if __name__ == "__main__": celery.start()
这是一个与flask应用程序工厂模式一起使用的解决方案,并且无需使用即可创建具有上下文的celery任务app.app_context()。在避免循环导入的同时获取该应用程序确实很棘手,但这可以解决问题。这是针对celery4.2的,这是撰写本文时的最新版本。
结构体:
repo_name/ manage.py base/ base/__init__.py base/app.py base/runcelery.py base/celeryconfig.py base/utility/celery_util.py base/tasks/workers.py
因此base是主要的应用程序包在这个例子中。在中,base/__init__.py我们创建celery实例,如下所示:
base/__init__.py
from celery import Celery celery = Celery('base', config_source='base.celeryconfig')
该base/app.py文件包含flask应用程序工厂,create_app并注意init_celery(app, celery)其中包含:
from base import celery from base.utility.celery_util import init_celery def create_app(config_obj): """An application factory, as explained here: http://flask.pocoo.org/docs/patterns/appfactories/. :param config_object: The configuration object to use. """ app = Flask('base') app.config.from_object(config_obj) init_celery(app, celery=celery) register_extensions(app) register_blueprints(app) register_errorhandlers(app) register_app_context_processors(app) return app
转到base/runcelery.py内容:
from flask.helpers import get_debug_flag from base.settings import DevConfig, ProdConfig from base import celery from base.app import create_app from base.utility.celery_util import init_celery CONFIG = DevConfig if get_debug_flag() else ProdConfig app = create_app(CONFIG) init_celery(app, celery)
接下来,base/celeryconfig.py文件(作为示例):
base/celeryconfig.py
# -*- coding: utf-8 -*- """ Configure Celery. See the configuration guide at -> http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration """ ## Broker settings. broker_url = 'pyamqp://guest:guest@localhost:5672//' broker_heartbeat=0 # List of modules to import when the Celery worker starts. imports = ('base.tasks.workers',) ## Using the database to store task state and results. result_backend = 'rpc' #result_persistent = False accept_content = ['json', 'application/text'] result_serializer = 'json' timezone = "UTC" # define periodic tasks / cron here # beat_schedule = { # 'add-every-10-seconds': { # 'task': 'workers.add_together', # 'schedule': 10.0, # 'args': (16, 16) # }, # }
现在在base/utility/celery_util.py文件中定义init_celery :
# -*- coding: utf-8 -*- def init_celery(app, celery): """Add flask app context to celery.Task""" TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask
对于以下方面的工人base/tasks/workers.py:
base/tasks/workers.py:
from base import celery as celery_app from flask_security.utils import config_value, send_mail from base.bp.users.models.user_models import User from base.extensions import mail # this is the flask-mail @celery_app.task def send_async_email(msg): """Background task to send an email with Flask-mail.""" #with app.app_context(): mail.send(msg) @celery_app.task def send_welcome_email(email, user_id, confirmation_link): """Background task to send a welcome email with flask-security's mail. You don't need to use with app.app_context() here. Task has context. """ user = User.query.filter_by(id=user_id).first() print(f'sending user {user} a welcome email') send_mail(config_value('EMAIL_SUBJECT_REGISTER'), email, 'welcome', user=user, confirmation_link=confirmation_link)
然后,你需要从文件夹内部repo_name以两个不同的cmd提示符启动celery beat和celery worker 。
在一个cmd提示符下,执行a celery -A base.runcelery:celery beat和另一个celery -A base.runcelery:celery worker。
a celery -A base.runcelery:celery beat
celery -A base.runcelery:celery worker
然后,执行需要flask上下文的任务。应该管用。
Flask-mail需要Flask应用程序上下文才能正常工作。在celery端实例化app对象,并使用app.app_context,如下所示:
with app.app_context(): celery.start()