首先明确一点,celery4.1+的官方文档已经详细说明,该版本之后不需要引入依赖 django-celery 这个库了,直接用 celery 本身就可以了,就在去年年初的一篇文章python3.7.2+Django2.0.4 使用django-celery遇到的那些坑,中提到的一些bug,在今年早已不复存在,所以技术更新频率越来越快,本文详细阐述用新版Celery(4.4.2)来实现。
关于celery的底层原理这里就不赘述了,简单的流程图就可以一图以蔽之
安装对应的库
pip3 install celery==4.4.2
pip3 install eventlet==0.25.2
pip3 install Django==2.0.4
注意版本要一致
配置settings.py文件:
'''
配置163
'''
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.163.com'
EMAIL_PORT = 25
#发送邮件的邮箱
EMAIL_HOST_USER = '[email protected]'
#在邮箱中设置的客户端授权密码
EMAIL_HOST_PASSWORD = 'COXWLCRFNUAHCYBD'
#收件人看到的发件人
EMAIL_FROM = '测试<[email protected]>'
‘’‘
添加celery
’‘’
CELERY_BROKER_URL = 'redis://localhost:6379/'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/'
CELERY_RESULT_SERIALIZER = 'json'
这里broker配置redis,同时backend也就是任务结果也存到redis中,格式为json,方便读写。
在settings.py同级目录创建celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
# 注册Celery的APP
app = Celery('mydjango')
# 绑定配置文件
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现各个app下的tasks.py文件
app.autodiscover_tasks()
注意mydjango为你当前的django项目名称
修改settings.py同级目录的init.py文件
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
#导包
import pymysql
#初始化
pymysql.install_as_MySQLdb()
__all__ = ['celery_app']
在应用中创建tasks.py文件
from celery.task import task
from django.core.mail import send_mail
# from celery import task
from time import sleep
from project.settings import EMAIL_FROM
# 自定义要执行的task任务
@task
def print_test(email):
subject = '哈哈哈哈' # 主题
message = '我是内容' # 内容
# sender = settings.EMAIL_FROM # 发送邮箱,已经在settings.py设置,直接导入
receiver = [email] # 目标邮箱 切记此处只能是列表或元祖
html_message = '<h1>%s</h1>' % message # 发送html格式
try:
send_mail(subject, message, EMAIL_FROM, receiver, html_message=html_message)
except:
pass
return '200'
如果需要,也可以在settings.py里将该任务配置为定时任务(周期任务)
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# 周期性任务
'task-one': {
'task': 'myapp.tasks.print_test',
'schedule': 5.0, # 每5秒执行一次
# 'args': ()
}
}
同时异步任务也可以通过django的视图进行在线调用
from myapp import tasks
def ctest(request,*args,**kwargs):
res=tasks.print_test.delay('[email protected]') #这个位置就是类似传参
#任务逻辑
return JsonResponse({'status':'successful','task_id':res.task_id})
这里的delay方法就是异步方式请求,而非django默认的同步执行步骤
在manage.py的目录下启动celery服务
celery worker -A project -l info -P eventlet
在浏览器中调用异步服务接口
最后,如果需要启动定时任务,就需要在manage.py所在的文件夹内单独启动beat服务
celery -A project beat -l info
可以看到任务队列会每隔五秒执行一次定时任务