1.项目结构
TestProject
├── CeleryTest
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ ├── models.py
│ ├── tasks.py
│ └── views.py
├── manage.py
├── TestProject
│ ├── __init__.py
│ ├── settings.py
│ ├── celery.py
│ ├── urls.py
│ └── wsgi.py
2.新建celery.py(与settings.py同级)
TestProject/TestProject/celery.py
# absolute_import:优先绝对导入,凡导包时第一层不加点号的则视为绝对导包,会导入系统库里面的包
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 设置celery的配置文件为django项目的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'TestProject.settings')
# 创建celery的实例
app = Celery('mycelery')
# 读取配置文件中以CELERY_为前缀的配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动读取每个注册app下的task.py
app.autodiscover_tasks()
3.初始化celery实例
TestProject/TestProject/init.py
from __future__ import absolute_import, unicode_literals
# django项目启动时将自动创建celery的实例
from .celery import app as celery_app
__all__ = ('celery_app',)
4.celery相关配置
TestProject/TestProject/settings.py
# 使用Redis作为消息中间件和存储引擎
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
5.注册任务
TestProject/CeleryTest/tasks.py
from __future__ import absolute_import, unicode_literals
import time
from celery import shared_task
@shared_task
def add(x, y):
time.sleep(10)
print('sleep 10秒:%s' % time.strftime('%H:%M:%S', time.localtime()))
return x + y
@shared_task
def mul(x, y):
time.sleep(5)
print('sleep 5秒:%s' % time.strftime('%H:%M:%S', time.localtime()))
return x * y
6.视图中调用任务
from django.http import JsonResponse
from .tasks import *
def demo(request, x, y):
res1 = add.delay(x, y)
res2 = mul.delay(x, y)
return JsonResponse({'task_id': res1.task_id, 'time': time.strftime('%H:%M:%S', time.localtime())})
7.启动worker
celery worker -A TestProject -l INFO -Ofair
参考:https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
https://www.cnblogs.com/wdliu/p/9530219.html