文章目录
python常用库之Celery
什么是Celery
Celery 官网:http://www.celeryproject.org/
gihub:https://github.com/celery
Celery 官网中文版:http://docs.jinkan.org/docs/celery/
celery
美: [ˈsɛləri]
英: ['selərɪ]
n. 芹菜;(调味用) 香芹粉
网络 西芹;旱芹;西洋芹
Celery 是使用 python 编写的分布式任务调度框架。
Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。celery非常易于集成到一些web开发框架中。
- 可以不依赖任何服务器,通过自身命令,启动服务
- celery服务为其他项目服务提供异步解决任务需求
常见的使用场景
有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求。
- 异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
- 延迟执行:解决延迟任务,比如5秒后发送短信等
- 定时执行:解决周期(周期)任务,比如每天数据统计
Celery特点
- 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的。
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务。
- 快速:一个单进程的celery每分钟可处理上百万个任务。
- 灵活: 几乎celery的各个组件都可以被扩展及自定制。
Celery几个主要的概念
-
celery 应用
用户编写的代码脚本,用来定义要执行的任务,然后通过 broker 将任务发送到消息队列中。 -
broker
提交任务到broker,也就是任务中间件上!
消息代理,又称消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息代理。
Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,一般使用rabbitMQ或者Redis。
-
backend
数据库,用来存储任务返回的结果。 -
worker
工人,用来执行 broker 分派的任务。
使用 celery 首先需要选择一个消息队列。可以是redis。
提交任务到broker,也就是任务中间件上。
celery 安装
pip install -U Celery
celery 使用
python 异步任务框架 Celery 入门,速看
参考URL: https://zhuanlan.zhihu.com/p/444879714
思路:需要编写一个 celery 应用,它用来创建任务和管理 wokers。
celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。
创建一个tasks.py 文件:
from celery import Celery
# Celery 应用初始化
celery_app = Celery('proj', , include=['proj.tasks'], broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
- 第一个参数是应用名称
- broker 是消息代理(如 redis url)的 broker URL
- backend 是结果存储 backend 的 URL (如 redis url)
- include 是要包含的任务模块
接着是任务函数文件tasks.py
import time
from proj.app_test import celery_app
@celery_app.task
def add(x, y):
time.sleep(1)
return x + y
tasks.py只有一个任务函数add,让它生效的最直接的方法就是添加app.task这个装饰器。add的功能是先休眠一秒,然后返回两个数的和。
最终调用:
from proj.tasks import add
import time
t1 = time.time()
r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)
r_list = [r1, r2, r3, r4, r5]
for r in r_list:
while not r.ready():
pass
print(r.result)
t2 = time.time()
print('共耗时:%s' % str(t2-t1))
在这个程序中,我们调用了add函数五次,delay()用来调用任务。
我们一次性调用了五次add函数,但是运行的总时间才1秒多。这是celery异步运行的结果,如果是同步运行,那么,至少需要5秒多,因为每调用add函数一次,就会休眠一秒。这就是celery的强大之处。
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
worker_max_tasks_per_child=20,
)
这段对 Celery 应用的配置进行了更新,主要包含:
- task_serializer 和 result_serializer:指定任务和结果的数据序列化方式为 JSON
- accept_content:指定只接受 JSON 格式的任务数据
- worker_max_tasks_per_child:指定每个 worker 进程最多执行 20 个任务就重启
定时更新缓存
原文链接:https://www.cnblogs.com/yume-zbh/p/16478697.html
视图类:
class BannerView(GenericViewSet, ListModelMixin):
queryset = models.Banner.objects.all()
serializer_class = serializer.BannerSerializer
def list(self, request, *args, **kwargs):
banner_list = cache.get('banner_list')
if banner_list:
# redis中有值直接返回
return Response(banner_list)
else:
# redis中没有值,获取数据再存入redis
res = super(BannerView, self).list(request, *args, **kwargs)
cache.set('banner_list', res.data)
return res
celery.py
from celery import Celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
import django
django.setup()
broker = 'redis://127.0.0.1:6379/2'
backend = 'redis://127.0.0.1:6379/3'
include = [
'celery_task.tasks'
]
app = Celery('main', broker=broker, backend=backend, include=include)
from datetime import timedelta
app.conf.beat_schedule = {
'banner_update': {
'task': 'celery_task.tasks.banner_update', # 任务路径
'schedule': timedelta(seconds=10), # 定时
'args': (), # 任务参数
}
}
任务:
@app.task
def banner_update():
query_set = models.Banner.objects.all()
ser = serializer.BannerSerializer(instance=query_set, many=True)
cache.set('banner_list', ser.data)
return True
Celery worker 可以通过两种方式启动
- 命令行启动
使用 Celery 提供的命令直接启动 worker,例如:
celery worker -A celery_app_name -l info
这是最简单的启动方式,适用于开发环境等。
- 代码中启动
在 Python 代码中调用 celery_app.start() 启动 worker,例如:
from celery_app import celery_app
if __name__ == '__main__':
celery_app.start()
- 命令行:使用 Celery 命令直接启动,简单快速
- 代码启动:在 Python 代码中调用 celery_app.start() 启动,控制更精细
两种方式各有优势,可以根据实际情况选择:
- 开发环境 plus 测试阶段:命令行启动更简单快速
- 生产环境:代码启动可以实现更复杂的启动逻辑和参数控制
在代码中启动Celery work
我已经在一个模块中定义了一个 Celery 应用程序,现在我想从其 __ main__中的同一模块启动worker
from celery import Celery
celery_app = Celery('tasks', broker='redis://')
@celery_app.task
def add(x, y):
return x + y
if __name__ == '__main__':
celery_app.start() # 启动worker
执行
python celery_app.py
会启动 Celery worker。
而当此模块被导入时,例如在另一个模块中:
from celery_app import add
result = add.delay(1, 2) # 调用添加任务
导入模块时 name 不为 main,所以 celery_app.start() 不会执行,worker 不会启动。
main 判断是一种很简单且常用的判断模块执行方式的方式。通过它可以实现在直接执行模块时启动worker,而在导入模块时不启动worker的目的。
在 main 中启动 worker 的主要优点是:
- 无需指定自定义入口文件,使用当前文件(celery_app.py)即可
- worker 的启动逻辑清晰明了,编写在 main 中
- 可以实现更复杂的启动控制逻辑
调用 celery_app.start() 可以传入必要参数:
celery_app.start(
['-A', 'tasks.celery_task.celery_task', 'worker',
'-c', '10', '-l',
'debug', '--logfile', log_file])
主要使用了以下几个参数:
- -A tasks.celery_task.celery_task
指定 Celery 应用名称,worker 将消费此 Celery 应用中定义的任务。 - -worker
指定启动的进程类型为 worker 进程。Celery 还支持 beat、events 等其他类型进程。 - -c 10
指定 worker 的并发数量为 10 ,即同时执行任务的数量为 10 。 - -l debug
指定日志级别为 debug ,会输出很详细的日志信息,方便开发调试。 - –logfile log_file
指定日志文件名称为 log_file ,日志将写入此文件。
这些参数是启动 Celery worker 常用和比较重要的配置项:
- 应用名:worker 消费哪个 Celery 应用
- 并发数:同时执行任务的数量
- 日志级别:日志输出详细级别
- 日志文件:日志写入位置
只有正确配置这些选项,worker 才能正常启动并处理任务。
-A 参数单独说明
-A 或 --app 参数是用于指定 worker 消费哪个 Celery 应用的任务,这是一个非常重要的参数。
当启动 Celery worker 时,如果不指定消费的 Celery 应用,则 worker 不知道需要执行哪些任务,会启动失败。
所以 -A 参数的值就是 worker 需要消费的 Celery 应用名称,用于告知 worker:
你需要执行的所有任务都在这个 Celery 应用中定义,你需要监听这个应用的消息队列获取任务,并将执行结果推送到这个应用对应的结果后端。
比如,如果我们有一个名为 proj_tasks 的 Celery 应用,在它的 celery_app.py 中定义了任务和配置:
# celery_app.py
from celery import Celery
proj_celery = Celery('proj_tasks', broker='redis://')
@proj_celery.task
def add(x, y):
return x + y
- proj_celery 是 Celery 实例名称,是一个变量
- proj_tasks 是该 Celery 应用的名称,会作为 -A 参数的值传入
那么启动 worker 消费这个应用的任务,需要传入:
celery worker -A proj_tasks # `proj_tasks` 即为应用名称
或者在代码中调用:
python
proj_celery.start(['worker', '-A', 'proj_tasks'])
-A proj_tasks 就是告知 worker:
你需要消费的任务全部在 proj_tasks 这个 Celery 应用中定义,你需要连接到它对应的 broker 获取任务,并将执行结果推送到它的 backend 中。
所以,-A 或 --app 参数的值就是 worker 消费的 Celery 应用名称,这是启动 worker 时最为重要的配置之一。
如果还不清楚,可以这么理解:
- 每个 Celery 应用都是一个任务集合加配置
- 启动 Celery worker 来执行这些任务
- 但 worker 需要知道需要消费哪个应用的任务
- 所以传入 -A 应用名称 ,告知 worker 消费这个应用的任务