Celery
一、概要
Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务( async task )和定时任务( crontab )。
异步任务:比如发送邮件、手机验证码,或者文件上传, 图像处理等等一些比较耗时的操作 ;
定时任务:需要在特定时间执行的任务。
二、celery的一些优点
简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
快速:一个单进程的celery每分钟可处理上百万个任务
灵活: 几乎celery的各个组件都可以被扩展及自定制
三、整体架构
1、说明
celery采用典型的生产者-消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。
2、结构图
3、名词解释
- 任务task:要异步执行的任务比如我们发送邮件的代码。
- 队列queue:将需要执行的任务加入到队列中。
- 工人worker:在一个新进程中,负责执行队列中的任务。
- 代理人broker:负责调度,在部署环境中使用redis或者rabbitmq(默认)。
四、工作流程
1、流程图
2、说明
- 产生任务task会放到queue队列中,
- 代理人broker会通知空闲的worker工人队列中有任务,
- worker工人就会去队列中把任务task取出来执行。每一个worker就是一个工作的进程。
五、基本使用
Celery3.1版本需要一个单独的库来与Django一起工作,3.1以后的版本就不在需要了,Django现在支持开箱即用,
需要注意的是Celery 4.0支持Django 1.8及更新版本。请将Celery 3.1用于早于Django 1.8的版本
1、 建立消息队列
-
说明
首先,我们必须拥先创建一个用于发送和接收消息消息中间件。Celery官网给出了多个broker的备选方案:RabbitMQ、Redis(推荐)、Database(不推荐)以及其他的消息中间件。本案例使用redis作为broker
-
安装
# 目前必须使用当前版本的版本,redis 3.x以上的版本不支持,官方目前还没有修复该bug pip install redis==2.10.6 # https://github.com/celery/celery/issues/5175
2、安装celery
-
说明
本案例使用的最新的4.x版本,因为兼容性存在很大的差异,所有选择版本的时候一定要注意
-
安装
pip install celery==4.2.1
3、创建celery.py实例文件
-
说明
在django项目的配置文件同级的目录下创建一个 celery.py模块来定义Celery实例
本案例使用的是django==1.11.12
-
示例
import os from celery import Celery # 设置项目的配置文件,项目名称.settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dj_celery.settings') # 实例化celery 一个工程中可以实例化多个 但是django中是没有必要实例化多个celery对象 app = Celery('dj_celery') # 加载celery配置文件 app.config_from_object('django.conf:settings', namespace='CELERY') # 自动注册app中的tasks文件 app.autodiscover_tasks() # 开启debug模式 @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
在配置文件中的init文件中注册
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
4、在setting文件中配置broker
-
说明
-
示例代码
# 传递消息时使用的redis 的ip 端口 数据库名 BROKER_URL = 'redis://127.0.0.1:6379/2'
5、在app中定义tasks.py文件
-
说明
不要修改文件的名称
-
示例代码
import time from celery import shared_task @shared_task def send_mail(): print('开始发送邮件') time.sleep(2) print('发送完成') return '1'
6、启动worker
-
启动django项目
-
启动redis
sudo service redis start
-
启动worker
celery worker -A dj_celery -l info
-
说明
- -A 代表的是Application的首字母,就是在应用里面定义的。
- worker 就是我们的工人了,他们会努力完成我们的工作的。
- -loglevel=info 指明了我们的工作后台执行情况,虽然工人们已经向你保证过一定努力完成任务。但是谨慎的你还是希望看看工作进展情况。
7 win10
ValueError: not enough values to unpack (expected 3, got 0)
-
安装
pip install eventlet
-
启动worker
celery -A <mymodule> worker -l info -P eventlet
六、扩展
1、缓存结果
2、定时任务
七、其它
1、查看异步任务情况
Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现,
-
安装flower:
pip install flower
-
启动flower(默认会启动一个webserver,端口为5555):
python manage.py celery flower
-
即可查看
2、内存泄漏
-
说明
长时间运行Celery有可能发生内存泄露,可以像下面这样设置
-
示例代码
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行了多少任务就会死掉