共计 1581 个字符,预计需要花费 4 分钟才能阅读完成。
一、flask-celery 安装
pip install celery
pip install redis==2.10.6
pip install flask-celery-helper
二、创建和加载
-
工程目录下创建 run_celery.py 文件
# 定义创建 celery 对象的方法 from celery import Celery def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(celery.Task): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery -
manage.py
from flask_script import Manager from flask_migrate import Migrate, MigrateCommand from app import create_app from exts import db from run_celery import make_celery # 将要生成表的模型类导入 from myApp.models import * app = create_app(__name__) celery = make_celery(app) # 创建迁移管理对象 migrate = Migrate(app, db) manager = Manager(app) manager.add_command("db", MigrateCommand) if __name__ == '__main__': manager.run() -
exts/ext_celery
from flask_celery import Celery celery = Celery() from .ext_celery import celery -
关联 manage.py 中的 celery 与 ext_celery 中的 cel
app.py
from exts import celery celery.init_app(app)
三、配置
# celery | |
CELERY_RESULT_BACKEND = "redis://:@127.0.0.1:6379/5" | |
CELERY_BROKER_URL = "redis://:@127.0.0.1:6379/6" | |
# celery 的工人一直启动可能会造成内存泄露,该参数规定每个工人执行了多少个任务后就会被杀死 | |
# CELERY_MAX_TASKS_PER_CHILD = 50 |
四、封装任务
在应用目录下创建名为 tasks 的包目录
# from manage import celery | |
from exts import celery | |
def mail(): | |
print("---- 开始耗时操作 ----") | |
import time | |
time.sleep(5) | |
print("---- 结束耗时操作 ----") |
五、添加到队列
from myApp.tasks import mail | |
def sendMail(): | |
# 将耗时任务添加到队列 | |
mail.delay() | |
return "邮件发送成功" |
六、启动 flask 服务
python manage.py runserver -d -r
七、启动工人
celery worker -A manage.celery --loglevel=info
正文完
星哥玩云-微信公众号
