文章目录
Celery是由Python开发、简单、灵活、可靠的分布式
任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:
简单:熟悉celery的工作流程后,配置使用简单
高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
快速:一个单进程的celery每分钟可处理上百万个任务
灵活:几乎celery的各个组件都可以被扩展及自定制
- Celery的应用场景:
- 当前执行任务,执行时间非常长,不需要实时返回结果
- 定时任务,每年生日、每个月帐单
为什么使用celery而非crontab => 10台机器上设置一些不同的定时任务,使用celery管理更方便
原理图
Celery的组成:
- 消息代理:Broker => 接收任务生产者发送过来的任务消息,存进队列,由消费者读取运行 => 消息队列
- 任务调试器:Beat => 读取配置文件,周期性地将配置中到期需要执行的任务发送到Broker中 => 生产者
- 任务执行: Worker => 执行任务的消息者,会读取Broker中的任务,并进行执行 => 消费者
- 生产者:celery提供的API => 在代码中发起任务 => 生产
- 结果存储: Result Backend => 任务处理完后,保存状态信息和结果,以供查询
生产者消费者模型
部署
消息代理和结果存储 => redis
#linux安装运行redis
# 安装
yum install redis
# 配置:修改监听端口 =》
vim /etc/redis.conf
bind 0.0.0.0
# 启动:
redis-server /etc/redis.conf &
# 重启:
kilall redis-server
# 如果没有killall命令,请这样安装
yum install psmisc -y
redis-server /etc/redis.conf &
=>port: 6379
Flask代码处理=> flask+celery
1. 安装依赖
pip install redis
pip install celery
2. 创建celery实例: celery_app/__init__.py
from celery import Celery
celery = Celery("celery_app")
# 从配置文件中读取并配置celery
celery.config_from_object('config.celery_config')
3. 创建配置文件 config/celery_config.py
# 配置消息中间件的地址
BROKER_URL = "redis://192.168.189.200:6379/1"
# 结果存放地址
CELERY_RESULT_BACKEND = "redis://192.168.189.200:6379/2"
# 启动Celery时,导入任务
CELERY_IMPORTS = (
'celery_app.tasks',
)
# 配置定时任务
4. 创建任务:celery_app/tasks.py
from . import celery
import time
import random
@celery.task
def celery_task(sth1):
"""输出start, end, time.sleep"""
print("celery_app.task start")
delay_time = random.randint(5, 10)
time.sleep(delay_time)
print("celery_app.task end")
return sth1
# 调用任务
@view01_bp.route('/index/')
def index():
# 调用一个异步任务
from celery_app.tasks import celery_task
# 立即发送任务,立刻执行任务
celery_task.delay('hello world!')
# celery_task.apply_async() => 指定运行倒计时,发送到哪个队列....
return "This is index"
"""
目标:
当触发该任务时,web页面能实时返回return的结果,而不是在等待
在worker看到任务被接收和处理
"""
5. 将celery配置启动入口:manage.py
from celery_app import celery
以便通过 => manage.celery 启动celery项目
6. 启动:
1. flask-web => python server.py
2. worker => 1个或n个 => celery worker -A manage.celery --loglevel=info -n nodename
nodename指定的workd的名字(自己取)
3. beat => 如果有定时任务
celery4 windows不支持结果处理
pip install eventlet
然后启动worker的时候加一个参数,如下:
celery -A <mymodule> worker -l info -P eventlet
也可从官方直接下载安装包:https://pypi.python.org/pypi/celery/
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py build
python setup.py install
如何把代码同步到linux上来
1.在pycharm:Tool->Deployment-> + ->
设置一个名字, SFTP
配置Connection
配置Mappings
2. 创建虚拟环境
cd /opt/flaskproj/
python3 -m venv linuxvenv
source linuxvenv/bin/activate
pip install -r requirements.txt
3. 运行
cd /opt/flaskproj/ && source linuxvenv/bin/activate && cd operbench
# 运行flask
python server.py
# 运行celery worker
celery worker -A manage.celery --loglevel=info -n node1
### web方式查看celery的工作情况
cd /opt/flaskproj/ && source linuxvenv/bin/activate && cd operbench
pip install flower
flower -A manage.celery --loglevel=info --address=192.168.189.200 --port=5555
实例
- 编写task.py
# tasks.py
import time
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379/0')
@celery.task
def sendmail(mail):
print('sending mail to %s...' % mail['to'])
time.sleep(2.0)
print('mail sent.')
- 启动Celery处理任务
$ celery -A tasks worker --loglevel=info
上面的命令行实际上启动的是Worker,如果要放到后台运行,可以扔给supervisor。
如何发送任务?
>>> from tasks import sendmail
>>> sendmail.delay(dict(to='[email protected]'))
<AsyncResult: 1a0a9262-7858-4192-9981-b7bf0ea7483b>