目录
Celery概述
Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
Celery 专注于实时任务处理,支持任务调度。所谓任务就是消息,消息中的有效载荷包含要执行任务需要的全部数据。
我们可以把Celery看作是一个分布式队列的管理工具,使用 Celery可以快速实现并管理一个分布式的任务队列。
Celery架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache
Celery应用场景
1. Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
2. 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
3. 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。
Celery特性
1. 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
2. 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
3. 方便把任务和配置管理相关联。
4. 可选多进程、Eventlet和Gevent三种模式并发执行。
5. 提供错误处理机制。
- 提供多种任务原语,方便实现任务分组、拆分和调用链。
- 支持多种消息代理和存储后端。
消息代理
Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息代理,但适用于生产环境的只有RabbitMQ和Redis,至于其他的方式,一是支持有限,二是可能得不到更好的技术支持。
Celery官方推荐的是RabbitMQ,Celery的作者Ask Solem Hoel最初在VMware就是为RabbitMQ工作的,Celery最初的设计就是基于RabbitMQ,所以使用RabbitMQ会非常稳定,成功案例很多。如果使用Redis,则需要能接受发生突然断电之类的问题造成Redis突然终止后的数据丢失等后果。
Celery序列化
在客户端和消费者之间传输数据需要序列化和反序列化,Celery支持如表9.2所示的序列化方案:
为了提供更高的性能,我们选择如下方案:
1. 选择RabbitMQ作为消息代理。
2. RabbitMQ的Python客户端选择librabbitmq这个C库。
3. 选择Msgpack做序列化。
4. 选择Redis做结果存储。
从一个简单的例子开始
安装 Celery 和 redis 以及 python 的 redis 支持:
1 2 3 |
sudo apt-get install redis-server pip install redis pip install celery |
这里需要注意如果你的 celery 是 4.0 及以上版本请确保 python 的 redis 库版本在 2.10.4 及以上,否则会出现 redis 连接 timeout 的错误,具体参考
然后,我们需要写一个task:
1 2 3 4 5 6 7 8 |
#tasks.py from celery import Celery app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') #配置好celery的backend和broker @app.task #普通函数装饰为 celery task def add(x, y): return x + y |
OK,到这里,broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:
1 |
celery -A tasks worker --loglevel=info |
意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态)
最后一步,就是触发任务啦,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数:
1 2 3 4 5 6 |
#trigger.py from tasks import add result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用 while not result.ready(): time.sleep(1) print 'task done: {0}'.format(result.get()) |
运行此脚本
delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready()
为 true,然后用 result.get()
取结果即可。
到此,一个简单的 celery 应用就完成啦。
进阶用法
经过快速入门的学习后,我们已经能够使用 Celery 管理普通任务,但对于实际使用场景来说这是远远不够的,所以我们需要更深入的去了解 Celery 更多的使用方式。
首先来看之前的task:
1 2 3 |
@app.task #普通函数装饰为 celery task def add(x, y): return x + y |
这里的装饰器app.task
实际上是将一个正常的函数修饰成了一个 celery task 对象,所以这里我们可以给修饰器加上参数来决定修饰后的 task 对象的一些属性。
首先,我们可以让被修饰的函数成为 task 对象的绑定方法,这样就相当于被修饰的函数 add 成了 task 的实例方法,可以调用 self 获取当前 task 实例的很多状态及属性。
其次,我们也可以自己复写 task 类然后让这个自定义 task 修饰函数 add ,来做一些自定义操作。
根据任务状态执行不同操作
任务执行后,根据任务状态执行不同操作需要我们复写 task 的 on_failure、on_success
等方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# tasks.py class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task done: {0}'.format(retval) return super(MyTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task fail, reason: {0}'.format(exc) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask) def add(x, y): return x + y |
嗯, 然后继续运行 worker:
1 |
celery -A tasks worker --loglevel=info |
运行脚本,得到:
再修改下tasks:
1 2 3 4 |
@app.task #普通函数装饰为 celery task def add(x, y): raise KeyError return x + y |
重新运行 worker,再运行 trigger.py:
可以看到,任务执行成功或失败后分别执行了我们自定义的 on_failure、on_success
绑定任务为实例方法
1 2 3 4 5 6 7 8 |
# tasks.py from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def add(self, x, y): logger.info(self.request.__dict__) return x + y |
然后重新运行:
执行中的任务获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。
关于 celery.task.request 对象的详细数据可以看这里
任务状态回调
实际场景中得知任务状态是很常见的需求,对于 Celery 其内建任务状态有如下几种:
参数 | 说明 |
---|---|
PENDING | 任务等待中 |
STARTED | 任务已开始 |
SUCCESS | 任务执行成功 |
FAILURE | 任务执行失败 |
RETRY | 任务将被重试 |
REVOKED | 任务取消 |
当我们有个耗时时间较长的任务进行时一般我们想得知它的实时进度,这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度,具体实现:
1 2 3 4 5 6 7 8 9 10 |
# tasks.py from celery import Celery import time @app.task(bind=True) def test_mes(self): for i in xrange(1, 11): time.sleep(0.1) self.update_state(state="PROGRESS", meta={'p': i*10}) return 'finish' |
然后在 trigger.py 中增加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# trigger.py from task import add,test_mes import sys def pm(body): res = body.get('result') if body.get('status') == 'PROGRESS': sys.stdout.write('\r任务进度: {0}%'.format(res.get('p'))) sys.stdout.flush() else: print '\r' print res r = test_mes.delay() print r.get(on_message=pm, propagate=False) |
然后运行任务:
定时/周期任务
Celery 进行周期任务也很简单,只需要在配置中配置好周期任务,然后在运行一个周期任务触发器( beat )即可:
新建 Celery 配置文件 celery_config.py:
1 2 3 4 5 6 7 8 9 10 11 12 |
# celery_config.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'ptask': { 'task': 'tasks.period_task', 'schedule': timedelta(seconds=5), }, } CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' |
配置中 schedule
就是间隔执行的时间,这里可以用 datetime.timedelta 或者 crontab 甚至太阳系经纬度坐标进行间隔时间配置,具体可以参考这里
如果定时任务涉及到 datetime 需要在配置中加入时区信息,否则默认是以 utc 为准。例如中国可以加上:
1 |
CELERY_TIMEZONE = 'Asia/Shanghai' |
然后在 tasks.py 中增加要被周期执行的任务:
1 2 3 4 5 6 7 |
# tasks.py app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') app.config_from_object('celery_config') @app.task(bind=True) def period_task(self): print 'period task done: {0}'.format(self.request.id) |
然后重新运行 worker,接着再运行 beat:
1 |
celery -A task beat |
可以看到周期任务运行正常~
链式任务
有些任务可能需由几个子任务组成,此时调用各个子任务的方式就变的很重要,尽量不要以同步阻塞的方式调用子任务,而是用异步回调的方式进行链式任务的调用:
错误示范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@app.task def update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info) @app.task def fetch_page(url): return myhttplib.get(url) @app.task def parse_page(url, page): return myparser.parse_document(page) @app.task def store_page_info(url, info): return PageInfo.objects.create(url, info) |
正确示范1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain() @app.task() def fetch_page(url): return myhttplib.get(url) @app.task() def parse_page(page): return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url): PageInfo.objects.create(url=url, info=info) |
正确示范2
1 |
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)]) |
链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。
这里的 s()
是方法 celery.signature()
的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,个人感觉有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。
调用任务
前面讲了调用任务不能直接使用普通的调用方式,而是要用类似 add.delay(2, 2)
的方式调用,而链式任务中又用到了 apply_async
方法进行调用,实际上 delay
只是 apply_async
的快捷方式,二者作用相同,只是 apply_async
可以进行更多的任务属性设置,比如 callbacks/errbacks 正常回调与错误回调、执行超时、重试、重试时间等等,具体参数可以参考这里
关于 AsyncResult
AsyncResult 主要用来储存任务执行信息与执行结果,有点类似 tornado 中的 Future 对象,都有储存异步结果与任务执行状态的功能,对于写 js 的朋友,它有点类似 Promise 对象,当然在 Celery 4.0 中已经支持了 promise 协议,只需要配合 gevent 一起使用就可以像写 js promise 一样写回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import gevent.monkey monkey.patch_all() import time from celery import Celery app = Celery(broker='amqp://', backend='rpc') @app.task def add(x, y): return x + y def on_result_ready(result): print('Received result for id %r: %r' % (result.id, result.result,)) add.delay(2, 2).then(on_result_ready) |
要注意的是这种 promise 写法现在只能用在 backend 是 RPC (amqp) 或 Redis 时。 并且独立使用时需要引入 gevent 的猴子补丁,可能会影响其他代码。 官方文档给的建议是这个特性结合异步框架使用更合适,例如 tornado、 twisted 等。
delay
与 apply_async
生成的都是 AsyncResult 对象,此外我们还可以根据 task id 直接获取相关 task 的 AsyncResult: AsyncResult(task_id=xxx)
关于 AsyncResult 更详细的内容,可以参考这里
利用 Celery 进行分布式队列管理、开发将会大幅提升开发效率,关于 Celery 更详细的使用大家可以去参考详细的官方文档