目录
1-3-1 任务添加 - add_job()、装饰器 scheduled_job()
1-3-2 任务删除 - remove_job()、remove()
1-3-3 任务继续和暂停 - resume_job、pause_job
1-3-4 任务的修改和重设 - modify、reschedule_job
一、apscheduler 轻量级的定时调度
1-1 基本概念 - APScheduler 的四种组件
- 触发器 triggers - 提供任务触发方式
- 触发器 包含调度逻辑。
- 每个作业都有自己的触发器,用于确定何时应该运行作业。
- 除了初始配置之外,触发器完全是无状态的。
- 商店 job stores - 提供任务保存方式
- 工作商店安排预定的工作。
- 默认作业存储只是将作业保留在内存中,但其他人将它们存储在各种类型的数据库中。
- 作业的数据在保存到持久性作业存储时会被序列化,并在从其中加载时进行反序列化。
- 作业存储(默认存储除外)不会将作业数据保留在内存中,而是充当中间人,用于在后端保存,加载,更新和搜索作业。
- 绝不能在调度程序之间共享作业存储。
- 执行人 executors - 提供任务调度方式
- 执行程序处理作业的运行。
- 他们通常通过将作业中指定的可调用对象提交给线程或进程池来完成此操作。
- 作业完成后,执行程序会通知调度程序,调度程序随后会发出相应的事件。
- 调度 schedulers - 提供任务工作方式
- 调度程序将其余部分绑定在一起。
- 您通常只在应用程序中运行一个调度程序。
- 应用程序开发人员通常不直接处理作业存储,执行程序或触发器。
- 相反,调度程序提供适当的接口来处理所有这些。
- 配置作业存储和执行程序是通过调度程序完成的,添加,修改和删除作业也是如此。
1-2 triggers 组件 - 提供三种触发方式
1-2-1 date - 在某个时间点执行一次(一次性)
from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() def my_job(text): print(text) # 指出确切执行时间的三种方式,date、datetime、字符串 # The job will be executed on November 6th, 2009 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) # The job will be executed on November 6th, 2009 at 16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) # 指定格式的文本字符串 sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text']) # 添加即刻运行的作业 # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=['text']) sched.start() ''' 相关参数 run_date(datetime | str) - 运行作业的日期/时间,为空则使用但前时间 timezone(datetime.tzinfo | str) - run_date如果它还没有时区的时区 '''
1-2-2 interval - 相隔指定时间段执行
from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job_function(): print("Hello World") sched = BlockingScheduler() # 通常的循环时间指定 # Schedule job_function to be called every two hours sched.add_job(job_function, 'interval', hours=2) # 限制计划运行的总时间长度 # The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00 sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') # 对执行的函数加上随机的延迟执行时间,用于错开任务的执行 # Run the `job_function` every hour with an extra-delay picked randomly in a [-120,+120] seconds window. sched.add_job(job_function, 'interval', hours=1, jitter=120) # 装饰器的使用 @sched.scheduled_job('interval', id='my_job_id', hours=2) def job_function(): print("Hello World") sched.start() ''' 相关参数 weeks (int) - 等待的周数 days(int) - 等待的天数 hours(int) - 等待的小时数 minutes(int) - 等待的分钟数 seconds(int) - 等待的秒数 start_date(datetime | str) - 区间计算的起点 end_date(datetime | str) - 触发的最新可能日期/时间 timezone(datetime.tzinfo | str) - 用于日期/时间计算的时区 jitter(int | None) - jitter最多提前或延迟作业执行几秒钟。 jitter选项使您可以将随机组件添加到执行时间。 如果您有多个服务器并且不希望它们在同一时刻运行作业 或者您希望防止具有类似选项的多个作业始终并发运行,则这可能很有用: '''
1-2-3 cron - 指定某个时间点执行
注意点总结
- 第一个工作日总是周一
- 夏令时(DST),使用UTC避免DST的错误
夏令时行为
cron触发器与所谓的“挂钟”时间一起工作。
因此,如果所选时区遵守DST(夏令时),您应该知道在进入或离开DST时它可能会导致cron触发器出现意外行为。
从标准时间切换到夏令时时,时钟会向前移动一小时或半小时,具体取决于时区。
同样,当切换回标准时间时,时钟向后移动一小时或半小时。
这将导致某些时间段根本不存在或重复。
如果您的日程安排会在其中任何一个时段执行作业,则可能会比预期更频繁或更少地执行。
这不是一个错误。如果您希望避免这种情况,请使用不遵守DST的时区,例如UTC。
# 在欧洲/赫尔辛基时区,这将不会在3月的最后一个星期日上午执行 # 同样,它将在10月的最后一个周日上午执行两次 sched.add_job(job_function, 'cron', hour=3, minute=30)
简单事例
from apscheduler.schedulers.blocking import BlockingScheduler def job_function(): print "Hello World" sched = BlockingScheduler() # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 限制计划的总时间 # Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') # 装饰器的使用 @sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!") # crontab 的使用 sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) # jitter 随机延迟添加 # Run the `job_function` every sharp hour with an extra-delay picked randomly in a [-120,+120] seconds window. sched.add_job(job_function, 'cron', hour='*', jitter=120) sched.start() ''' 相关参数 year(int | str) - 4位数年份 month (int | str) - 月(1-12) day(int | str) - (1-31)日 week (int | str) - ISO周(1-53) day_of_week(int | str) - 工作日的数字或名称(0-6或星期一,星期二,星期三,星期四,星期五,星期五,星期日) hour (int | str) - 小时(0-23) minute (int | str) - 分钟(0-59) second(int | str) - second(0-59) start_date(datetime | str) - 触发的最早可能日期/时间(包括) end_date(datetime | str) - 触发的最新可能日期/时间(包括) timezone(datetime.tzinfo | str) - 用于日期/时间计算的时区(默认为调度程序时区) jitter(int | None) - jitter最多提前或延迟作业执行几秒钟。 '''
表达式类型
下表列出了从一年到二年的字段中使用的所有可用表达式。可以在单个字段中给出多个表达式,以逗号分隔。
表达 领域 描述 *
任何 触发每一个价值 */a
任何 a
从最小值开始,触发每个值a-b
任何 触发 a-b
范围内的任何值(必须小于b)a-b/c
任何 触发范围 c
内的每个值a-b
xth y
天 在一个月内的 x
工作日发生火灾y
last x
天 x
在一个月内最后一次发生的工作日开火last
天 在一个月的最后一天开火 x,y,z
任何 触发任何匹配的表达式; 可以组合任意数量的任何上述表达式 在
month
和day_of_week
领域接受英文简写月和星期名(jan
-dec
和mon
-sun
分别)。1-3 任务操作
1-3-1 任务添加 - add_job()、装饰器 scheduled_job()
- 如果在应用程序初始化期间在持久性作业存储中调度作业,则必须为作业和使用定义显式ID,
replace_existing=True
或者每次应用程序重新启动时都会获得作业的新副本!- 要立即运行作业,请
trigger
在添加作业时省略参数。1-3-2 任务删除 - remove_job()、remove()
- 通过调用 remove_job() 作业的ID和作业存储别名
- 通过调用 remove() 你得到的Job实例 add_job()
# 根据任务id删除 remove_job scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id') # remove 根据任务实例删除 job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove()
1-3-3 任务继续和暂停 -
resume_job、pause_job
您可以通过 job 实例或调度程序本身轻松地暂停和恢复作业。
暂停作业时,将清除其下一个运行时间,并且不会再计算运行时间,直到作业恢复为止。
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 根据任务实例 job.pause() job.resume() # 根据任务id暂停 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id')
1-3-4 任务的修改和重设 -
modify、reschedule_job
- 修改对象的任意属性:
job.modify(max_instances=6, name='Alternate name')
- You can modify any job attributes by calling either apscheduler.job.Job.modify() or modify_job(). You can modify any Job attributes except for
id
.- 重设对象的属性:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
- 如果要重新安排作业 - 即更改其触发器,则可以使用 apscheduler.job.Job.reschedule() 或 reschedule_job()。这些方法为作业构造新的触发器,并根据新的触发器重新计算其下一个运行时间。
二、Flask-apscheduler
from flask import Flask from flask_apscheduler import APScheduler app = Flask(__name__) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() def task(): print('task') @app.route('/index') def index(): app.apscheduler.add_job(func=task, id='task', trigger='interval', seconds=2) return 'ok' # 测试执行定时任务的同时 是否会影响其他接口的使用 @app.route('/test') def test(): return 'test' if __name__ == '__main__': app.run()