celery下载:sudo pip install Celery
celery 的使用方法:
用通俗一点的话讲,你写了一个循环,这个循环会输出1-9这9个数字,
你把这9个数字当成参数传给使用celery的装饰器装饰过的函数,celery就会开9个进程同时跑这9个函数,
在不使用celery的情况下你串行跑这九个函数如果需要9秒,那么用了celery可能就只需要两秒,够直白了
#首先 import 这两个模块
from celery import Celery from celery import bootsteps
# 新建一个文件,给你一个已经写好的配置文件,这个配置文件必须命名为tasks.py celery启动时会找这个文件
# 这个类继承bootsteps.Step,在启动,停止,shutdown的时候,打印log,具体说这个类没啥用,但写上比较好
class InfoStep(bootsteps.Step): def __init__(self, parent, **kwargs): print('{0!r} is in init'.format(parent)) def start(self, parent): print('{0!r} is starting'.format(parent)) def stop(self, parent): print('{0!r} is stopping'.format(parent)) def shutdown(self, parent): print('{0!r} is shutting down'.format(parent)) #app是你定义的celery类对象名字,写上你本地的redis地址,让任务队列可以再redis中保存 app=Celery('tasks',broker='redis://192.168.0.86:6001/0',backend='redis://192.168.0.86:6001/0') #把上面的log类加入到app app.steps['worker'].add(InfoStep) #这里是一些配置项,配置完之后当装饰器 装饰一下要跑的函数就行了 app.conf.update(
# CELERY_TASK_RESULT_EXPIRES = 3600 , #如果1小时后还没有执行则执行失败 CELERYD_CONCURRENCY = 20 , #这个比较重要,意思是同时需要开启多少个进程,这里写的是20个,20个算比较多了 CELERYD_PREFETCH_MULTIPLIER = 20 , CELERYD_MAX_TASKS_PER_CHILD = 200 , CELERY_DEFAULT_QUEUE = "default", #这个是默认队列,如果你啥都没配,那一般函数都会使用这个队列 CELERY_QUEUES={ "default": {"exchange": "default","exchange_type": "direct","routing_key": "default",}, "rightfactot": {"routing_key": "rightfactot_a.#","exchange": "rightfactot","exchange_type": "topic",}, "run_task": {"routing_key": "run_task_a.#","exchange": "run_task","exchange_type": "topic",}, "handly_back": {"routing_key": "handly_back_a.#","exchange": "handly_back","exchange_type": "topic",} },#这个比较重要,如果你有10个任务都要用celery,这个帮你分辨哪个任务使用哪个队列 CELERY_ROUTES={ 'tasks.handly': {"queue": "rightfactot", "routing_key": "rightfactot_a"}, 'tasks.run_task': {"queue": "run_task", "routing_key": "run_task_a"}, 'tasks.handly_back': {"queue": "handly_back", "routing_key": "handly_back_a"} },#这个比较重要,和上面的任务队列必须匹配使用,用来把任务队列配到相应函数上
#这些都没啥用,但又不能不配,看需求配置 CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml'], CELERY_RESULT_SERIALIZER='json', )
@app.task(bind=True, max_retries=3, default_retry_delay=6)
#把配好的app装饰到函数上,当执行这个函数的时候,celery就能收到参数并执行了 def run_vfdate(self,i,ty): print i,ty
pass # 不同的函数名称使用不同的队列,在上面有配过 @app.task(bind=True, max_retries=3, default_retry_delay=6) def run_vfdate_add(self,i,ty): print i,ty pass if __name__ == '__main__': app.start()
#最后新建一个任务调用文件 run_task,在这个文件里调用一下这个函数
from tasks import run_vfdate
run_vfdate.delay(1,2)
我们先跑一下celery
export C_FORCE_ROOT="true" celery -A tasks worker --loglevel=info
再跑一下我们创建的run_task,这样我们就把1,2当成参数传给了celery装饰过的函数
celery执行的任务此时应该能打印1,2这两个数字
#-----------------------------------------------------------------------------
如此就完成了celery使用的整个流程