一、celery的安装使用
输入python,然后tab回车,只看到了python,python2,python2.7,没有python3
1、yum安装python3,pip3和最新版的Django https://yq.aliyun.com/articles/640213
2、安装celery
pip3.6 install celery
3、安装redis
pip3.6 install redis
4、使用celery
结合官方文档:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html
和使用手册:https://zhuanlan.zhihu.com/p/43588348
二、celery使用实例详解
1、实例1
//1、创建异步任务:
from celery import Celery
app = Celery('tasks', broker='redis://127.0.0.1', backend='redis://127.0.0.1')
@app.task
def add(x, y):
return x + y
//2、方法调用(生产者生产消息):
from tasks import add #导入我们的任务函数add
import time
result = add.delay(12,12) #异步调用,这一步不会阻塞,程序会立即往下运行
while not result.ready():# 循环检查任务是否执行完毕
print(time.strftime("%H:%M:%S"))
time.sleep(1)
print(result.get()) #获取任务的返回结果
print(result.successful()) #判断任务是否成功执行
//3、远程调用(生产者生产消息):
from tasks import app
print( app.send_task("tasks.add",args=(11,25)) )
//4、查看任务执行结果(消费者):
from tasks import add
taskid= 'd471f106-c8c9-4770-b0c7-64a1f3194e18'
add.AsyncResult(taskid).get()
//5、问题来了,在一个文件中,先生产后消费OK不?
from tasks import app
from tasks import add
taskid = app.send_task("tasks.add",args=(11,25))
print( add.AsyncResult(taskid).get() )
//答:不可以。生产者与消费者。生产者生产1个返回taskid就返回了,消费者持续异步地消费这个任务。
celery -A tasks worker -l info
这里,-A 表示我们的程序的模块名称,worker 表示启动一个执行单元,-l 是批 -level,表示打印的日志级别。
查看celery命令:celery –help
如查看celery worker命令:celery worker –help
2、实例2:多任务的celery项目
目录如下:
async_myCeleryProj_result.py文件
call_myCeleryProj_tasks.py文件
myCeleryProj文件夹
__init__.py文件
app.py文件
settings.py文件
tasks.py文件
1、__init__.py文件是个空文件,告诉Python myCeleryProj是一个可导入的包
2、app.py文件
from celery import Celery
app = Celery("myCeleryProj", include=["myCeleryProj.tasks"])
app.config_from_object("myCeleryProj.settings")
if __name__ == "__main__":
app.start()
3、settings.py文件
from kombu import Queue
import re
from datetime import timedelta
from celery.schedules import crontab
CELERY_QUEUES = ( # 定义任务队列
Queue("default", routing_key="task.#"), # 路由键以“task.”开头的消息都进default队列
Queue("tasks_A", routing_key="A.#"), # 路由键以“A.”开头的消息都进tasks_A队列
Queue("tasks_B", routing_key="B.#"), # 路由键以“B.”开头的消息都进tasks_B队列
)
CELERY_TASK_DEFAULT_QUEUE = "default" # 设置默认队列名为 default
CELERY_TASK_DEFAULT_EXCHANGE = "tasks"
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"
CELERY_ROUTES = (
[
(
re.compile(r"myCeleryProj\.tasks\.(taskA|taskB)"),
{"queue": "tasks_A", "routing_key": "A.import"},
), # 将tasks模块中的taskA,taskB分配至队列 tasks_A ,支持正则表达式
(
"myCeleryProj.tasks.add",
{"queue": "default", "routing_key": "task.default"},
), # 将tasks模块中的add任务分配至队列 default
],
)
BROKER_URL = "redis://127.0.0.1:6379/0" # 使用redis 作为消息代理
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" # 任务结果存在Redis
CELERY_RESULT_SERIALIZER = "json" # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERYBEAT_SCHEDULE = {
"add": {
"task": "myCeleryProj.tasks.add",
"schedule": timedelta(seconds=10),
"args": (10, 16),
},
"taskA": {
"task": "myCeleryProj.tasks.taskA",
"schedule": crontab(hour=21, minute=10),
},
"taskB": {
"task": "myCeleryProj.tasks.taskB",
"schedule": crontab(hour=21, minute=12),
},
}
4、tasks.py文件
import os
from myCeleryProj.app import app
import time
import socket
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
@app.task
def add(x, y):
s = x + y
time.sleep(3) # 模拟耗时操作
print("主机IP {}: x + y = {}".format(get_host_ip(), s))
return s
@app.task
def taskA():
print("taskA begin...")
print(f"主机IP {get_host_ip()}")
time.sleep(3)
print("taskA done.")
@app.task
def taskB():
print("taskB begin...")
print(f"主机IP {get_host_ip()}")
time.sleep(3)
print("taskB done.")
5、call_myCeleryProj_tasks.py文件
from myCeleryProj.tasks import app
print( app.send_task("myCeleryProj.tasks.add",args=(4,5)) )
print( app.send_task("myCeleryProj.tasks.taskA") )
print( app.send_task("myCeleryProj.tasks.taskB") )
6、async_myCeleryProj_result.py
from myCeleryProj.tasks import add
taskid_add = 'dc3622e6-89bf-48e1-8981-85dbe0bd83c5'
taskid_taskA = 'bf99ed11-8cba-4f46-a74e-bd2fc5902857'
taskid_taskB = '6d681b00-73bb-482a-94ad-40a18387d3ab'
print( add.AsyncResult(taskid_add).get() )
print( add.AsyncResult(taskid_taskA).get() )
print( add.AsyncResult(taskid_taskB).get() )
三、错误排查&解决
官方celery文档:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html
from celery import Celery
app = Celery('tasks', broker='redis://10.26.27.85')
@app.task
def add(x, y):
return x + y
执行 celery -A tasks worker --loglevel=info, 报错如下:
consumer: Cannot connect to redis://10.26.27.85:6379//
(1)首先 redis-cli -h 10.26.27.85 -p 6379
> info
报错提示受保护,那就到redis配置文件中去掉保护。
cd /usr/local/matrix/etc/redis/
里面有3个文件,redis6380.conf redis6381.conf redis.conf
其中redis.conf实际就是默认的redis6379的配置文件了,vim redis.conf
查找protected, 定位到这行:protected-mode yes,将yes改为no
查找daemon,定位到这行:daemonize yes,如果是no就改为yes,这里本身就是yes,无需更改
查找bind,定位到这行:bind 127.0.0.1,改为:bind 0.0.0.0,所有的都可以绑定
(2)重启redis-server
redis-server /usr/local/matrix/etc/redis/redis.conf
(13)再次执行 celery -A tasks worker --loglevel=info
还是报同样的错误,神奇吧!诡异吧!
(4)查看redis进程
ps -ef|grep -v grep|grep redis
[root@VM redis]# ps -ef|grep -v grep|grep redis
root 1414 1 0 2019 ? 08:09:54 /usr/local/matrix/bin/redis-server 10.26.27.85:6380
root 1419 1 0 2019 ? 07:43:09 /usr/local/matrix/bin/redis-server 10.26.27.85:6381
root 26229 25889 0 15:49 pts/2 00:00:00 redis-cli -h 10.26.27.85 -p 6379
root 27201 1 0 15:52 ? 00:00:00 redis-server 0.0.0.0:6379
可能是6379对应的进程僵死或者重启没奏效。把6379的进程(26229和27201)都kill -9杀死,然后重新开启:redis-server /usr/local/matrix/etc/redis/redis.conf
重新打开,就可以了:[root@VM xiafen]# ps -ef|grep -v grep|grep redis
root 1414 1 0 2019 ? 08:10:00 /usr/local/matrix/bin/redis-server 10.26.27.85:6380
root 1419 1 0 2019 ? 07:43:15 /usr/local/matrix/bin/redis-server 10.26.27.85:6381
root 27201 1 0 15:52 ? 00:00:07 redis-server 0.0.0.0:6379
(5)再次执行 celery -A tasks worker --loglevel=info
就妥妥的了