一、Celery异步任务
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zufang.settings')
app = celery.Celery(
'zufang',
broker='redis://120.77.222.217:6379/1',
backend='redis://120.77.222.217:6379/2'
{
1、pip install django-celery-results
2、INSTALLED_APPS = ['django_celery_results']
3、python manage.py migrate django_celery_results
{
1、删除django_celery_results 和 django_migrations
2、修改数据库: sql_safe_updates 为 OFF
show variables like '%safe%'
set session sql_safe_updates = off;
set global sql_safe_updates = off;
3、再次执行迁移:python manage.py migrate django_celery_results
}
4、自动从指定的应用中发现任务(异步任务/定时任务)
app.autodiscover_tasks(('应用', ))
}
)
QINIU_ACCESS_KEY = 'KarvlHfUdoG1mZNSfDVS5Vh3nae2jUZumTBHK-PR'
QINIU_SECRET_KEY = 'SFPFkAn5NENhdCMqMe9wd_lxGHAeFR5caXxPTtt7'
AUTH = qiniu.Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
@app.task
def upload_file_to_qiniu(file_path, filename):
token = AUTH.upload_token(QINIU_BUCKET_NAME, filename)
return qiniu.put_file(token, filename, file_path)
{
1、upload_file_to_qiniu.delay(file_path,filename)
2、task = upload_file_to_qiniu.s(countdown=延迟做任务,expires=超时取消任务)
task.delay(file_path,filename)
3、upload_file_to_qiniu.applay_async((file_path,filename),
queue='queue1',countdown=10,
retry_policy={'max_reties':3},
expires = 60,compression='zlib')
)
}
二、Celery定时任务
from celery.schedules import crontab
app.conf.update(
timezone=settings.TIME_ZONE,
enable_utc=True,
beat_schedule={
'task':{
'task':'common.tasks.display_info',
'schedule':crontab('*','*','*','*','*'),
'args':('hello,world',)
},
'task2':{},
})
@app.tast
def display_info(conntent):
print(content)
Celery消息队列监控:
pip install flower
celery flower --broker = amqp://luohao:密码@主机:端口/数据库(指定消息队列)
Celery 原语:
1、任务打包异步执行 -group - 将一组任务异步执行并返回一组结果
from celery import group
task_group = group(task1(),task2(),...)
result = task_group() --- list
2、链式执行 -chain- 按顺序链式执行任务并将上一个任务的结果传给下一个任务
from celery import chain
task = chain(task1.s()|task2.s()|...)
result = task()