Celery 除了可以执行异步任务,也支持执行周期性任务,或者说定时任务。Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。
让我们看看例子,项目结构如下:
__init__.py 代码如下:
1
2
3
|
from
celery
import
Celery
app
=
Celery(
'demo'
)
app.config_from_object(
'celery_app.celeryconfig'
)
|
celeryconfig.py 代码如下:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
from
datetime
import
timedelta
from
celery.schedules
import
crontab
# Broker and Backend
BROKER_URL
=
'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND
=
'redis://127.0.0.1:6379/0'
# Timezone
CELERY_TIMEZONE
=
'Asia/Shanghai'
# 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'
# import
CELERY_IMPORTS
=
(
'celery_app.task1'
,
'celery_app.task2'
)
# schedules
CELERYBEAT_SCHEDULE
=
{
'add-every-30-seconds'
: {
'task'
:
'celery_app.task1.add'
,
'schedule'
: timedelta(seconds
=
30
),
# 每 30 秒执行一次
'args'
: (
5
,
8
)
# 任务函数参数
}
}
|
task1.py 代码如下:
01
02
03
04
05
06
07
08
09
10
11
12
|
import
time
from
celery_app
import
app
@app
.task
def
add(x, y):
time.sleep(
2
)
return
x
+
y
[mw_shl_code
=
python,true]
import
time
from
celery_app
import
app
@app
.task
def
add(x, y):
time.sleep(
2
)
return
x
+
y
|
[/mw_shl_code]
现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:
celery_demo $ celery -A celery_app worker --loglevel=info
接着,启动 Celery Beat 进程,定时将任务发送到 Broker,在项目根目录下执行下面命令:
celery_demo $ celery beat -A celery_app
celery beat v4.0.1 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2016-12-11 09:48:16
Configuration ->
. broker -> redis://127.0.0.1:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
之后,在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次
在上面,我们用两个命令启动了 Worker 进程和 Beat 进程,我们也可以将它们放在一个命令中
$ celery -B -A celery_app worker --loglevel=info
更多技术资讯可关注:gzitcast