1、写在前面
最近的flask项目遇到一些需要并发的情况,开始图省事,直接搞上了多线程,但是多线程的并发量很小,不利于扩展。
所以学习一下Celery的使用
2、基础概念
Celery是一个强大的分布式任务队列,他可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常用来实现异步任务和定时任务。异步任务比如发送邮件,文件上传图像处理等;定时任务就是需要在特定时间执行的任务。
- 任务队列
任务队列是一种跨线程,跨机器工作的一种机制,任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获取新的任务并处理。 - 任务模块
包含异步任务和定时任务。异步任务通常在业务逻辑中被触发并发往任务队列;定时任务由Celery Beat进程周期性地将任务发往任务队列。 - 消息中间件Broker
Broker,就是任务调度队列,接收任务生产者发来的消息(任务),将任务存入到队列。Celery本身不提供队列服务,官方推荐使用RabbitMQ和Redis等。 - 任务执行单元Worker
Worker是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。 - 任务结果存储Backend
- Backend用于存储任务的执行结果,以供查询。同消息中间件一样,也可使用RabbitMQ和Redis,MySql等。
3、demo代码
3.1 异步任务
1 # -*- coding: utf-8 -*- 2 3 """ 4 Celery主类 5 启动文件名必须为celery.py!!! 6 """ 7 8 from __future__ import absolute_import # 为兼容Python版本,绝对引入 9 from celery import Celery, platforms 10 11 app = Celery( 12 main='celery_task', # celery启动包名称 13 broker='redis://127.0.0.1:6379/1', 14 backend='redis://127.0.0.1:6379/2', 15 include=['celery_task.tasks', ] # celery所有任务 16 ) 17 18 if __name__ == '__main__': 19 app.start()
1 # -*- coding: utf-8 -*- 2 3 """ 4 定义任务 5 """ 6 7 from __future__ import absolute_import 8 from celery import Celery, group 9 from .celery import app 10 from time import sleep 11 12 13 @app.task 14 def add(x, y): 15 sleep(5) 16 return x + y 17 18 19 @app.task 20 def substract(x, y): 21 sleep(5) 22 return x - y
1 # -*- coding: utf-8 -*- 2 3 from celery_task.tasks import add,substract 4 5 # 立即告知celery去执行test_celery任务,并传入一个参数 6 result1 = add.delay(5,5) 7 print(result1.id) 8 result2 = substract.delay(5,5) 9 print(result2.id)
1 # -*- coding: utf-8 -*- 2 3 from celery.result import AsyncResult 4 from celery_task.celery import app 5 6 ''' 7 获取任务结果,但要想办法获取到task_id 8 ''' 9 10 async = AsyncResult(id='8d04fc47-0f72-49a1-a58a-b74e28eb9a41', app=app) 11 12 if async.successful(): 13 result = async.get() 14 print(result) 15 # result.forget() # 将结果删除,执行完成,结果不会自动删除 16 # async.revoke(terminate=True) # 无论现在是什么时候,都要终止 17 # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。 18 elif async.failed(): 19 print('执行失败') 20 elif async.status == 'PENDING': 21 print('任务等待中被执行') 22 elif async.status == 'RETRY': 23 print('任务异常后正在重试') 24 elif async.status == 'STARTED': 25 print('任务已经开始被执行')
执行步骤
- 在celery_test目录下cmd执行celery worker -A celery_task -l info -P eventlet,开启worker,异步等待任务到来
- 执行send_tasks.py,将发布任务(任务写入borker)
- 等待一段时间后,可以在celery命令行看到执行结果
- 执行get_result.py获取执行结果
3.2 定时任务
1 # -*- coding: utf-8 -*- 2 3 # 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句 4 # 该代码中,名字是不一样的,最好也要不一样 5 from __future__ import absolute_import 6 from celery import Celery 7 8 app = Celery('celery_tasks') 9 app.config_from_object('celery_test2.config')
1 # -*- coding: utf-8 -*- 2 3 from __future__ import absolute_import 4 from celery.schedules import crontab 5 from datetime import timedelta 6 7 # 使用redis存储任务队列 8 broker_url = 'redis://127.0.0.1:6379/7' 9 # 使用redis存储结果 10 result_backend = 'redis://127.0.0.1:6379/8' 11 12 task_serializer = 'json' 13 result_serializer = 'json' 14 accept_content = ['json'] 15 # 时区设置 16 timezone = 'Asia/Shanghai' 17 # celery默认开启自己的日志 18 # False表示不关闭 19 worker_hijack_root_logger = False 20 # 存储结果过期时间,过期后自动删除 21 # 单位为秒 22 result_expires = 60 * 60 * 24 23 24 # 导入任务所在文件 25 imports = [ 26 'celery_test2.celery_task2.task1', 27 'celery_test2.celery_task2.task2' 28 ] 29 30 # 需要执行任务的配置 31 beat_schedule = { 32 'task1': { 33 # 具体需要执行的函数 34 # 该函数必须要使用@app.task装饰 35 'task': 'celery_test2.celery_task2.task1.add', 36 # 定时时间 37 # 每分钟执行一次,不能为小数 38 'schedule': crontab(minute='*/1'), 39 # 或者这么写,每小时执行一次 40 # "schedule": crontab(minute=0, hour="*/1") 41 # 执行的函数需要的参数 42 'args': (5,5) 43 }, 44 'task2': { 45 'task': 'celery_test2.celery_task2.task2.substract', 46 # 设置定时的时间,10秒一次 47 'schedule': timedelta(seconds=10), 48 'args': (5,5) 49 } 50 }
1 # -*- coding: utf-8 -*- 2 from .. import app 3 from time import sleep 4 5 @app.task 6 def add(x, y): 7 sleep(5) 8 print('add') 9 return x + y
1 # -*- coding: utf-8 -*- 2 from .. import app 3 from time import sleep 4 5 @app.task 6 def substract(x, y): 7 sleep(5) 8 print(substract) 9 return x - y
执行步骤
- 在celery_test2的同级目录下cmd执行celery -A celery_test2 worker -l info -P eventlet,开启worker,异步等待任务到来
- 在celery_test2的同级目录下cmd执行celery -A celery_test2 beat,发送定时任务,定时像borker推送任务
- 可在backend中查看执行结果,task_id
4、应用代码
4.1 Flask结合Celery,实现进度下载进度条