python客户端调度器
基于apscheduler
官方文档: https://apscheduler.readthedocs.io/en/latest/
类似java的quartz
以scheduler驱动,以高独立的方式注册job,通过监控trigger去触发job,并从线程池和进程池中获取执行单元并执行job,支持job的恢复、持久化和动态修改
scheduler:
运行后不会产生新的进程,跟随主进程的退出而自动退出,不需要手动关闭,无性能负担,不会影响主进程,可以配置:
1、job持久化方式,默认存储在sqlite中,(注意需要安装sqlite相关依赖),通过url配置sqlite文件的位置,默认为当前路径的jobs.sqlite文件,其核心表为apscheduler_jobs,字段包括rowid(自增序列)、id(uuid/用户设置——其值必须唯一)、next_run_time(下一次运行的时间)、job_state(job的状态,包括公共schduler配置和trigger的信息、线程的配置)
2、执行job的参数,如线程池数量、进程池数量
3、时区,建议设置为 Asia/Shanghai(国内的python工程都是这个时区)
job:
一个调度任务的最小单元,由函数引用对象、函数参数列表、触发器组成、scheduler配置组成:
1.函数引用必须为全局声明 或者 模块:函数名
2.函数参数列表会将创建job时赋值参数args的值作为参数调用函数引用
3.触发器有cron、interval、date等类型
cron类似于linux的cron,具体参考表达式写法,实现全部需求,难用
interval需要设置start_date和end_date和触发频率,实现一段时间内重复调用的需求,易用
date需要设置触发的时间run_date,只能满足未来某一个时间段的一次调用,易用
4.scheduler配置有id(必须唯一,可自动生成uuid)、replace_existing(是否替换已经存在的,建议为True,否则很容易就会产生很多新的)
运行在线程池中,跟随主进程的退出而退出,简单高效不会影响主进程,线程池大小通过在创建scheduler时配置
封装了schedulerComponent,通过重写对象创建的函数实现了调度器的单例,应用了饿汉逻辑,调度器的实现类为BackgroundScheduler,其独立运行在后台,通过该sheduler对象可以操作job,实现job的创建(如上),job的修改、job的删除、job的查询、全量查询、全量删除以及job的暂停、恢复、停止,单个Job的ud操作需要指定job id ,其值来自创建时指定的id值或者自动生成的uuid
依赖
APScheduler模块
使用
建议在项目启动时候调用 SchedulerComponent的 get_scheduler方法,例如:
from component.schedulerComponent import SchedulerComponent
def main():
print("main")
SchedulerComponent.get_scheduler()
if __name__ == '__main__':
main()
这样就完成了调度器跟随应用启动并单例的过程(注意该调度器实例对象保存在该类的属性中,只能通过cls._instance进行访问获取,同时为了保证调度正常运行而不提供修改调度器的方法,当然你也可以通过类型.属性名进行修改,那这样我也没办法了)
使用过程中需要获取调度器,通过类方法即可,注意:job的id值是非常重要的,创建job时,如果指定了id属性,则返回值为trigger,如果没有指定则返回id值,id值直接关系到ud操作能否进行,例如:
注意: 我没有封装job id 的保存是因为job id 临时保存了也没有意义,取值时还是需要通过一个key进行取,那又何必保存呢?因为你的这个key就可以直接作为id值进行使用
# 拿到调度器
scheduler = SchedulerComponent.get_scheduler()
now_date_time = datetime.datetime.now() # 当前时间
run_date_time = datetime.datetime(now_date_time.year, now_date_time.month, now_date_time.day,
now_date_time.hour,
now_date_time.minute, 59) # 运行的时间
start_date_time = now_date_time.strftime('%Y-%m-%d %H:%M:%S') # 开始的时间
end_date_time = (now_date_time + datetime.timedelta(seconds=59)).strftime('%Y-%m-%d %H:%M:%S') # 结束的时间
# ############创建job
# 一到某个时间就做某件事情
scheduler.add_job(job, trigger='date', id="service_1",
run_date=run_date_time, replace_existing=True, args=["一到某个时间"])
# 在某个时间范围内做某件事情
scheduler.add_job(job, trigger='interval', id="service_2", seconds=2, start_date=start_date_time,
end_date=end_date_time, replace_existing=True,
args=["在某个时间范围内"])
# 每隔多长时间做某件事情
trigger_args = {
"seconds": 2
# , "minutes": None
# , "hours": None
# , "days": None
}
scheduler.add_job(job, trigger='interval', id="service_3", replace_existing=True, **trigger_args,
args=["每隔多长时间"])
# #代码中可以直接写注解,例如以下,但是需要先显示获取scheduler对象
# @scheduler.scheduled_job('interval', id='my_job_id', hours=2)
# def my_job():
# print("my_job")
# ############修改job
# 方式一: 先删除再添加
# scheduler.pause_job(job1_id)
# scheduler.remove_job(job1_id) # 删除job
# scheduler.add_job(job, 'date', id=job1_id, run_date=run_date_time, replace_existing=True, args=["一到某个时间"])
# 方式二: 直接修改trigger
# scheduler.reschedule_job(job1_id, trigger='date',
# run_date=run_date_time)
# ############删除job
# scheduler.remove_job("service_1") # 删除job
# jobs = scheduler.get_jobs() # 删除所有job
# for job_data in jobs:
# scheduler.remove_job(job_data.id)
# ############查询job
# jobs = scheduler.get_jobs() # 查询所有job
# print(jobs.__str__())
# job1_obj = scheduler.get_job("service_1") # 根据id查询单个job
# print("service_1", job1_obj)
一个改造的例子为:
原来代码为:
class errands(QWidget):
def start(self):
if not self.myThread:
from util.timer import TimerThread
self.myThread = TimerThread(self.work,'errands', second=1)
self.parent.errandsThread = self.myThread
self.myThread.start()
改为:
def job(name):
print(name)
def example():
# 拿到调度器
from component.schedulerComponent import SchedulerComponent
scheduler = SchedulerComponent.get_scheduler()
# 每隔多长时间做某件事情
trigger_args = {
"seconds": 2
# , "minutes": None
# , "hours": None
# , "days": None
}
scheduler.add_job(job, trigger='interval', id="service_3", replace_existing=True, **trigger_args,args=["每隔多长时间"])
if __name__ == '__main__':
example()
详细代码
import datetime
import time
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
class SchedulerComponent:
"""
调度组件
create by tristan
"""
_instance = None
# ############## 需要自定义的内容-start ##############
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # 持久化的方式
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
timezone = 'Asia/Shanghai'
# ##############需要自定义的内容-end ##############
def __new__(cls, *args, **kwargs):
if cls._instance is None or cls._instance.scheduler is None:
cls._instance = super(SchedulerComponent, cls).__new__(cls, *args, **kwargs)
# 实例化一个调度器'
cls._instance.scheduler = BackgroundScheduler()
# cls._instance.scheduler = BackgroundScheduler(jobstores=cls.jobstores, executors=cls.executors,
# job_defaults=cls.job_defaults,
# timezone=cls.timezone)
print("调度器初始化完成")
cls._instance.scheduler.start() # 启动调度器
return cls._instance
@classmethod
def get_scheduler(cls):
"""
获取得到调度器
:return: 调度器
"""
if not cls._instance:
cls._instance = SchedulerComponent()
return cls._instance.scheduler
@classmethod
def shutdown(cls):
"""
关闭调度器
:return:
"""
cls._instance.scheduler.shotdown()
@classmethod
def get_job(cls, job_id):
return cls.get_scheduler().get_job(job_id)
@classmethod
def remove_job(cls, job_id):
cls.get_scheduler().remove_job(job_id)
def job(name):
print(name)
class Test:
def test(self):
print("test")
def example():
print("main")
# 拿到调度器
scheduler = SchedulerComponent.get_scheduler()
if scheduler.get_job("model.tab_home.page.Page.work"):
print("...")
now_date_time = datetime.datetime.now() # 当前时间
run_date_time = datetime.datetime(now_date_time.year, now_date_time.month, now_date_time.day,
now_date_time.hour,
now_date_time.minute, 59) # 运行的时间
start_date_time = now_date_time.strftime('%Y-%m-%d %H:%M:%S') # 开始的时间
end_date_time = (now_date_time + datetime.timedelta(seconds=59)).strftime('%Y-%m-%d %H:%M:%S') # 结束的时间
# ############创建job
# 一到某个时间就做某件事情
# scheduler.add_job(job, trigger='date', id="home_new.Ui_Home_Window.sendServerMsg",
# run_date=run_date_time, replace_existing=True, args=["一到某个时间"])
trigger_args = {
"seconds": 2
# , "minutes": None
# , "hours": None
# , "days": None
}
# 在某个时间范围内做某件事情
# scheduler.add_job(job, trigger='interval', id="home_new.Ui_Home_Window.sendServerMsg2", **trigger_args,
# start_date=start_date_time,
# end_date=end_date_time, replace_existing=True,
# args=["在某个时间范围内"])
# 每隔多长时间做某件事情
# #以模块为导向的指定job func的形式
scheduler.add_job("component.schedulerComponent:Test.test", trigger='interval', id="schedulerComponent.service.3",
replace_existing=True,**trigger_args, args=["每隔多长时间"]) # 创建或替换job
# scheduler.add_job(job, trigger='interval', id="schedulerComponent.service.3_1", replace_existing=True,
# **trigger_args, args=["每隔多长时间_1"]) # 创建或替换job
# #代码中可以直接写注解,例如以下,但是需要先显示获取scheduler对象
# @scheduler.scheduled_job('interval', id='my_job_id', hours=2)
# def my_job():
# print("my_job")
# ############修改job
# 方式一: 先删除再添加
# scheduler.pause_job(job1_id)
# scheduler.remove_job(job1_id) # 删除job
# scheduler.add_job(job, 'date', id=job1_id, run_date=run_date_time, replace_existing=True, args=["一到某个时间"])
# 方式二: 直接修改trigger
# scheduler.reschedule_job(job1_id, trigger='date',
# run_date=run_date_time)
# ############删除job
# scheduler.remove_job("service_1") # 删除job
# jobs = scheduler.get_jobs() # 删除所有job
# for job_data in jobs:
# scheduler.remove_job(job_data.id)
# ############查询job
# jobs = scheduler.get_jobs() # 查询所有job
# print(jobs.__str__())
# job1_obj = scheduler.get_job("service_1") # 根据id查询单个job
# print("service_1", job1_obj)
if __name__ == '__main__':
print(datetime.datetime.now().__str__())
# example()
time.sleep(4444)