版权声明:风火编程, 欢迎指正. https://blog.csdn.net/weixin_42620314/article/details/82819410
celery发布异步任务(redis数据库)
单一文件
1.创建celery应用
from celery import Celery
celery_app = Celery(
"name",
broker = "celery://127.0.0.1:6379/0,
backend="celery://127.0.0.1:6379/1"
)
定义celery任务
@celery_app.task
def celery_job(*args, ***kwargs):
"""定义异步任务"""
return result
# 任务逻辑
3. 发布celery任务
celery_obj = celery_job.delay(*args, ***kwargs)
result = celery_obj.get() #会发生阻塞, 在需要时再获取
开启celery服务
<1>进入项目启动文件目录执行命令:
celery -A celery_app的绝对路径(用.连接) worker -l info
分目录
1.创建celery_task包
1.1 config.py模块
broker_url = “redis://127.0.0.1:6379/3”
result_backend = “redis://127.0.0.1:6379/4”
1.2 main.py模块
celery_app = Celery( “name”)
celery_app.config_from_obj(config) # 导入celery配置
1.3 创建celery任务包
1.3.1 创建tasks模块
import celery_app
@celery_app.task
def celery_job(*args, **kwargs):
“”“定义celery任务”""
# 任务逻辑
return result
1.4 在main模块添加任务自动搜索功能
celery_app.autodiscover_tasks([
“celery任务包的绝对路径(用.连接)”
])
2. 在主程序中发布异步任务
task_obj = celery_job.delay(*args, **kwargs)
result = task_obj.get()
3. 在启动文件路径开启celery服务
celery -A celery_app的绝对路径(用.连接) worker -l info