共计 1581 个字符,预计需要花费 4 分钟才能阅读完成。
一、flask-celery 安装
pip install celery
pip install redis==2.10.6
pip install flask-celery-helper
二、创建和加载
-
工程目录下创建 run_celery.py 文件
# 定义创建 celery 对象的方法 from celery import Celery def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(celery.Task): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery
-
manage.py
from flask_script import Manager from flask_migrate import Migrate, MigrateCommand from app import create_app from exts import db from run_celery import make_celery # 将要生成表的模型类导入 from myApp.models import * app = create_app(__name__) celery = make_celery(app) # 创建迁移管理对象 migrate = Migrate(app, db) manager = Manager(app) manager.add_command("db", MigrateCommand) if __name__ == '__main__': manager.run()
-
exts/ext_celery
from flask_celery import Celery celery = Celery() from .ext_celery import celery
-
关联 manage.py 中的 celery 与 ext_celery 中的 cel
app.py
from exts import celery celery.init_app(app)
三、配置
# celery
CELERY_RESULT_BACKEND = "redis://:@127.0.0.1:6379/5"
CELERY_BROKER_URL = "redis://:@127.0.0.1:6379/6"
# celery 的工人一直启动可能会造成内存泄露,该参数规定每个工人执行了多少个任务后就会被杀死
# CELERY_MAX_TASKS_PER_CHILD = 50
四、封装任务
在应用目录下创建名为 tasks 的包目录
# from manage import celery
from exts import celery
@celery.task()
def mail():
print("---- 开始耗时操作 ----")
import time
time.sleep(5)
print("---- 结束耗时操作 ----")
五、添加到队列
from myApp.tasks import mail
@myApp.route("/sendMail/")
def sendMail():
# 将耗时任务添加到队列
mail.delay()
return "邮件发送成功"
六、启动 flask 服务
python manage.py runserver -d -r
七、启动工人
celery worker -A manage.celery --loglevel=info
正文完
星哥说事-微信公众号