我真的很难为Flask,SQLAlchemy和Celery设置正确的设置。我进行了广泛的搜索并尝试了不同的方法,但似乎没有任何效果。我错过了应用程序上下文,或者无法运行工作程序,或者还有其他问题。该结构非常通用,因此我可以构建一个更大的应用程序。
我正在使用:Flask 0.10.1,SQLAlchemy 1.0,Celery 3.1.13,我当前的设置如下:
app / init.py
#Empty app / config.py import os basedir = os.path.abspath(os.path.dirname(__file__)) class Config: @staticmethod def init_app(app): pass class LocalConfig(Config): DEBUG = True SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir, "data-dev.sqlite") CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' config = { "local": LocalConfig} app / exstensions.py from flask.ext.sqlalchemy import SQLAlchemy from celery import Celery db = SQLAlchemy() celery = Celery()
app / factory.py
from extensions import db, celery from flask import Flask from flask import g from config import config def create_before_request(app): def before_request(): g.db = db return before_request def create_app(config_name): app = Flask(__name__) app.config.from_object(config[config_name]) db.init_app(app) celery.config_from_object(config) # Register the blueprints # Add the before request handler app.before_request(create_before_request(app)) return app
app / manage.py
from factory import create_app app = create_app("local") from flask import render_template from flask import request @app.route('/test', methods=['POST']) def task_simple(): import tasks tasks.do_some_stuff.delay() return "" if __name__ == "__main__": app.run()
app / models.py
from extensions import db class User(db.Model): __tablename__ = "user" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(128), unique=True, nullable=False)
app / tasks.py
from extensions import celery from celery.signals import task_prerun from flask import g, current_app @task_prerun.connect def close_session(*args, **kwargs): with current_app.app_context(): # use g.db print g @celery.task() def do_some_stuff(): with current_app.app_context(): # use g.db print g
在文件夹应用中:
python.exe manage.py
celery.exe worker -A tasks
我收到对我没有任何意义的导入错误。我应该以不同的方式构造应用程序吗?最后,我认为我需要一个非常基本的设置,例如,使用带有工厂模式的Flask,能够使用Flask-SQLAlchmey扩展,并需要一些工作人员来访问数据库。
非常感谢你的帮助。
启动celery工作者时执行回溯。
Traceback (most recent call last): File "[PATH]\scripts\celery-script.py", line 9, in <module> load_entry_point('celery==3.1.13', 'console_scripts', 'celery')() File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main main() File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main cmd.execute_from_commandline(argv) File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline super(CeleryCommand, self).execute_from_commandline(argv))) File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline argv = self.setup_app_from_commandline(argv) File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline user_preload = tuple(self.app.user_options['preload'] or ()) AttributeError: 'Flask' object has no attribute 'user_options'
更新我根据注释中的建议更改了代码。该工作人员现在可以启动,但是在对它的get请求进行测试时http://127.0.0.1:5000/test。我得到以下回溯:
http://127.0.0.1:5000/test
Traceback (most recent call last): File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task args=args, kwargs=kwargs) File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send response = receiver(signal=self, sender=sender, \**named) File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session with current_app.app_context(): File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__ return getattr(self._get_current_object(), name) File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object return self.__local() File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app raise RuntimeError('working outside of application context') RuntimeError: working outside of application context exc, exc_info.traceback)))
我不喜欢current_app的建议。
你的celery对象需要访问应用程序上下文。我在网上找到了一些有关使用工厂功能创建Celery对象的信息。以下示例在没有消息代理的情况下进行了测试。
#factory.py from celery import Celery from config import config def create_celery_app(app=None): app = app or create_app(config) celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery
在tasks.py中:
#tasks.py from factory import create_celery_app from celery.signals import task_prerun from flask import g celery = create_celery_app() @task_prerun.connect def celery_prerun(*args, **kwargs): #print g with celery.app.app_context(): # # use g.db print g @celery.task() def do_some_stuff(): with celery.app.app_context(): # use g.db g.user = "test" print g.user