将任务分配给其他的进程去运行,django的主进程只负责发起任务,而执行任务的不在使用django的主进程。Python有一个很棒的异步任务框架,叫做celery。
Django为了让开发者开发更加方便,集成了celery,形成了django-celery插件
1.安装django-celery
#Redis模块的兼容不稳定,必须安装2.10.6
pip install django-celery
pip install django-redis
pip install redis==2.10.6
2.安装redis ,解压到指定目录
Redis-x64-3.2.100.zip 提取码: jmbf
#启动redis
redis-server.exe redis.windows.conf
#关闭redis
redis-cli.exe
shutdown
django-celery只是将任务发布出去,让长时间的任务:爬取一个网站,发送一个验证码这样的工作,不再阻塞主线程,web服务器只负责发起任务和接受任务的结果,中间执行的部分交给其他线程、进程、服务器去做。
Celery异步任务
1、在settings当中配置django-celery
下面也是settings.py 配置
# celery 配置
import djcelery
djcelery.setup_loader()# 模块加载
BROKER_URL = 'redis://127.0.0.1:6379/1' # 任务容器地址,redis数据库地址
CELERY_IMPORTS = ('CeleryTask.tasks') # 具体任务文件
CELERY_TIMEZONE = 'Asia/Shanghai' # celery 时区
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # celey处理器,固定
2.创建使用celery的app
python manage.py startapp CeleryTask
在项目的主目录下,编写celery的控制文件,(控制文件的名字最好是celery)
celery.py
import os
from celery import Celery
from django.conf import settings
# 设置celery的环境变量和django-celery的工作目录
os.environ.setdefault("DJANGO_SETTINGS_MODULE","CeleryTask.settings")
# 实例化celery应用,传入服务器名称
app = Celery("art_project")
# 加载celery配置
app.config_from_object("django.conf:settings")
# 如果在项目中,创建了task.py,那么celery就会沿着app去查找task.py来生成任务
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
3.新建tasks.py在celeryTask app下
4.编写tasks.py文件
from __future__ import absolute_import
from Qshop.celery import app
@app.task
def add(x,y):
return x+y
5.然后为了djcelery进行数据库同步
python manage.py check
python manage.py makemigrations
python manage.py migrate
6.编写视图触发异步任务
ursl.py
urlpatterns = [
path('mtv/', middle_test_view),
]
views.py
from CeleryTask.tasks import add
def get_task(request):
num1 = request.GET.get("num1",1)
num2 = request.GET.get("num2",2)
add.delay(int(num1),int(num2))
return JsonResponse({"data":"success"})
7.启动celery worker
--loglevel=info 指定日志级别
python manage.py celery worker --loglevel=info
访问路由查看效果。
Celery 定时任务
在settings .py配置
from celery.schedules import crontab
from celery.schedules import timedelta
CELERYBEAT_SCHEDULE = {
u"测试任务":{
"task":"CeleryTask.tasks.sendDing",
"schedule":timedelta(seconds=10)
}
}
tasks.py
@app.task
def sendDing(content="定时任务执行",to="15037609692"):
headers = {
"Content-Type": "application/json",
"Charset": "utf-8"
}
requests_data = {
"msgtype": "text",
"text": {
"content": content
},
"at": {
"atMobiles": [
],
"isAtAll": True
}
}
if to:
requests_data["at"]["atMobiles"].append(to)
requests_data["at"]["isAtAll"] = False
else:
requests_data["at"]["atMobiles"].clear()
requests_data["at"]["isAtAll"] = True
sendData = json.dumps(requests_data)
response = requests.post(url=DING_URL, headers=headers, data=sendData)
content = response.json()
return content
启动worker
python manage.py celery worker --loglevel=info
启动定时任务
python manage.py celerybeat --loglevel=info
注意:需要启动的有,django项目,redis数据库,worker,定时任务