celery简介
celery是基于python开发的分布式异步消息任务队列,说它是队列,其实并不是我们通常理解的那种存放数据或者任务的一个管道,可以看做是一个软件或者说是一个功能组件,其主要构成部分如下:
- user:用户程序,用于告知celery去执行一个任务。
- broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
- worker:执行任务
除了上述三个组件,还有一个可选的backend, 用于存放任务执行结果
安装
安装celery
pip3 install celery
安装redis
- 官方站点:
http://download.redis.io/releases/
, 下载最新版或者最新stable版,建议stable版本 - 解压源码并进入目录
- 不用configure
- 直接make
make PREFIX=/usr/local/redis install
- 在解压的redis目录把redis.conf 复制到 /usr/local/redis/bin 目录下
- 修改redis.conf的
daemonize yes
bind 0.0.0.0
- 使用
./redis-server redis.conf
,redis-cli
pip install -U celery[redis]
使用
基本使用
t1.py 定义了Celery对象,并用该对象装饰我们自定义的函数
import time
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
@app.task
def xxxooo(x, y):
time.sleep(10)
return x + y
然后启动worker
前台启动 celery worker -A t1 -l info
后台启动 celery multi start w1 -A proj -l info
把celery可以识别的任务传给celery这个组件
from t1 import xxxooo
result = xxxooo.delay(4, 4)
print(result.id)
from celery.result import AsyncResult
from s1 import app
async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
生产上使用
目录结果如下:
pro_cel
├── celery_tasks# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件
│ └── tasks.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务
celery.py
文件名必须是celery.py
from celery import Celery
celery = Celery('xxxxxx',
broker='redis://localhost:6379',
backend='redis://localhost:6379',
include=['celery_tasks.tasks'])
celery.conf.timezone = 'Asia/Shanghai'
celery.conf.enable_utc = False
tasks.py
import time
from .celery import celery
@celery.task
def xxxxx(*args, **kwargs):
time.sleep(5)
return "任务结果"
@celery.task
def hhhhhh(*args, **kwargs):
time.sleep(5)
return "任务结果"
check_result
from celery.result import AsyncResult
from celery_tasks.celery import celery
async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
send_task
import celery_tasks.tasks
# 立即告知celery去执行xxxxxx任务,并传入两个参数
result = celery_tasks.tasks.xxxxx.delay(4, 4)
print(result.id)
使用方式:
- 启动worker 进入pro_cel目录下执行
celery worker -A celery_tasks -l info
- 执行
python3 send_tasks.py
定时任务
设定时间让celery到了时间帮我们执行一次任务
import datetime from celery_tasks.tasks import xxxxx ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) s10 = datetime.timedelta(seconds=10) ctime_x = utc_ctime + s10 result = xxxxx.apply_async(args=[1, 3], eta=ctime_x) print(result.id)
类似crontab的定时任务
from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False app.conf.beat_schedule = { 'add-every-10-seconds': { 'task': 'celery_tasks.tasks.xxxxx', 'schedule': 10.0, 'args': (16, 16) }, 'add-every-12-seconds': { 'task': 'celery_tasks.tasks.xxxxx', 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), 'args': (16, 16) }, }
使用:
celery beat -A celery_tasks
celery worker -A proj -l celery_tasks