python客户端调度器

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_35559756/article/details/84058397

python客户端调度器

​ 基于apscheduler

​ 官方文档: https://apscheduler.readthedocs.io/en/latest/

​ 类似java的quartz

​ 以scheduler驱动,以高独立的方式注册job,通过监控trigger去触发job,并从线程池和进程池中获取执行单元并执行job,支持job的恢复、持久化和动态修改
​ scheduler:

​ 运行后不会产生新的进程,跟随主进程的退出而自动退出,不需要手动关闭,无性能负担,不会影响主进程,可以配置:

​ 1、job持久化方式,默认存储在sqlite中,(注意需要安装sqlite相关依赖),通过url配置sqlite文件的位置,默认为当前路径的jobs.sqlite文件,其核心表为apscheduler_jobs,字段包括rowid(自增序列)、id(uuid/用户设置——其值必须唯一)、next_run_time(下一次运行的时间)、job_state(job的状态,包括公共schduler配置和trigger的信息、线程的配置)

​ 2、执行job的参数,如线程池数量、进程池数量

​ 3、时区,建议设置为 Asia/Shanghai(国内的python工程都是这个时区)

​ job:

​ 一个调度任务的最小单元,由函数引用对象、函数参数列表、触发器组成、scheduler配置组成:

​ 1.函数引用必须为全局声明 或者 模块:函数名

​ 2.函数参数列表会将创建job时赋值参数args的值作为参数调用函数引用

​ 3.触发器有cron、interval、date等类型

​ cron类似于linux的cron,具体参考表达式写法,实现全部需求,难用

​ interval需要设置start_date和end_date和触发频率,实现一段时间内重复调用的需求,易用

​ date需要设置触发的时间run_date,只能满足未来某一个时间段的一次调用,易用

​ 4.scheduler配置有id(必须唯一,可自动生成uuid)、replace_existing(是否替换已经存在的,建议为True,否则很容易就会产生很多新的)

​ 运行在线程池中,跟随主进程的退出而退出,简单高效不会影响主进程,线程池大小通过在创建scheduler时配置

​ 封装了schedulerComponent,通过重写对象创建的函数实现了调度器的单例,应用了饿汉逻辑,调度器的实现类为BackgroundScheduler,其独立运行在后台,通过该sheduler对象可以操作job,实现job的创建(如上),job的修改、job的删除、job的查询、全量查询、全量删除以及job的暂停、恢复、停止,单个Job的ud操作需要指定job id ,其值来自创建时指定的id值或者自动生成的uuid

依赖

APScheduler模块

使用

​ 建议在项目启动时候调用 SchedulerComponent的 get_scheduler方法,例如:

from component.schedulerComponent import SchedulerComponent
def main():
    print("main")
    SchedulerComponent.get_scheduler()
if __name__ == '__main__':
    main()

​ 这样就完成了调度器跟随应用启动并单例的过程(注意该调度器实例对象保存在该类的属性中,只能通过cls._instance进行访问获取,同时为了保证调度正常运行而不提供修改调度器的方法,当然你也可以通过类型.属性名进行修改,那这样我也没办法了)

​ 使用过程中需要获取调度器,通过类方法即可,注意:job的id值是非常重要的,创建job时,如果指定了id属性,则返回值为trigger,如果没有指定则返回id值,id值直接关系到ud操作能否进行,例如:

​ 注意: 我没有封装job id 的保存是因为job id 临时保存了也没有意义,取值时还是需要通过一个key进行取,那又何必保存呢?因为你的这个key就可以直接作为id值进行使用

# 拿到调度器
    scheduler = SchedulerComponent.get_scheduler()
    now_date_time = datetime.datetime.now()  # 当前时间
    run_date_time = datetime.datetime(now_date_time.year, now_date_time.month, now_date_time.day,
                                      now_date_time.hour,
                                      now_date_time.minute, 59)  # 运行的时间
    start_date_time = now_date_time.strftime('%Y-%m-%d %H:%M:%S')  # 开始的时间
    end_date_time = (now_date_time + datetime.timedelta(seconds=59)).strftime('%Y-%m-%d %H:%M:%S')  # 结束的时间

    # ############创建job

    # 一到某个时间就做某件事情
    scheduler.add_job(job, trigger='date', id="service_1",
                      run_date=run_date_time, replace_existing=True, args=["一到某个时间"])

    # 在某个时间范围内做某件事情
    scheduler.add_job(job, trigger='interval', id="service_2", seconds=2, start_date=start_date_time,
                      end_date=end_date_time, replace_existing=True,
                      args=["在某个时间范围内"])

    # 每隔多长时间做某件事情
    trigger_args = {
        "seconds": 2
        # , "minutes": None
        # , "hours": None
        # , "days": None
    }
    scheduler.add_job(job, trigger='interval', id="service_3", replace_existing=True, **trigger_args,
                      args=["每隔多长时间"])

    # #代码中可以直接写注解,例如以下,但是需要先显示获取scheduler对象
    # @scheduler.scheduled_job('interval', id='my_job_id', hours=2)
    # def my_job():
    #     print("my_job")

    # ############修改job
    # 方式一: 先删除再添加
    # scheduler.pause_job(job1_id)
    # scheduler.remove_job(job1_id)  # 删除job
    # scheduler.add_job(job, 'date', id=job1_id, run_date=run_date_time, replace_existing=True, args=["一到某个时间"])
    # 方式二: 直接修改trigger
    # scheduler.reschedule_job(job1_id, trigger='date',
    #                          run_date=run_date_time)

    # ############删除job
    # scheduler.remove_job("service_1")  # 删除job
    # jobs = scheduler.get_jobs()  # 删除所有job
    # for job_data in jobs:
    #     scheduler.remove_job(job_data.id)

    # ############查询job
    # jobs = scheduler.get_jobs()  # 查询所有job
    # print(jobs.__str__())
    # job1_obj = scheduler.get_job("service_1")  # 根据id查询单个job
    # print("service_1", job1_obj)

​ 一个改造的例子为:

​ 原来代码为:

class errands(QWidget):
    def start(self):
        if not self.myThread:
            from util.timer import TimerThread
            self.myThread = TimerThread(self.work,'errands', second=1)
            self.parent.errandsThread = self.myThread
            self.myThread.start()

​ 改为:

def job(name):
    print(name)
    
def example():
    # 拿到调度器
    from component.schedulerComponent import SchedulerComponent
    scheduler = SchedulerComponent.get_scheduler()
    
    # 每隔多长时间做某件事情
    trigger_args = {
        "seconds": 2
        # , "minutes": None
        # , "hours": None
        # , "days": None
    }
    
    scheduler.add_job(job, trigger='interval', id="service_3", replace_existing=True, **trigger_args,args=["每隔多长时间"])
    
if __name__ == '__main__':
    example()

详细代码

import datetime
import time

from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler


class SchedulerComponent:
    """
    调度组件
    create by tristan
    """
    _instance = None
    # ############## 需要自定义的内容-start ##############
    jobstores = {
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # 持久化的方式
    }
    executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(5)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    timezone = 'Asia/Shanghai'

    # ##############需要自定义的内容-end  ##############

    def __new__(cls, *args, **kwargs):
        if cls._instance is None or cls._instance.scheduler is None:
            cls._instance = super(SchedulerComponent, cls).__new__(cls, *args, **kwargs)
            # 实例化一个调度器'
            cls._instance.scheduler = BackgroundScheduler()
            # cls._instance.scheduler = BackgroundScheduler(jobstores=cls.jobstores, executors=cls.executors,
            #                                               job_defaults=cls.job_defaults,
            #                                               timezone=cls.timezone)
            print("调度器初始化完成")
            cls._instance.scheduler.start()  # 启动调度器
        return cls._instance

    @classmethod
    def get_scheduler(cls):
        """
        获取得到调度器
        :return: 调度器
        """
        if not cls._instance:
            cls._instance = SchedulerComponent()
        return cls._instance.scheduler

    @classmethod
    def shutdown(cls):
        """
        关闭调度器
        :return:
        """
        cls._instance.scheduler.shotdown()

    @classmethod
    def get_job(cls, job_id):
        return cls.get_scheduler().get_job(job_id)

    @classmethod
    def remove_job(cls, job_id):
        cls.get_scheduler().remove_job(job_id)


def job(name):
    print(name)


class Test:
    def test(self):
        print("test")


def example():
    print("main")

    # 拿到调度器
    scheduler = SchedulerComponent.get_scheduler()
    if scheduler.get_job("model.tab_home.page.Page.work"):
        print("...")

    now_date_time = datetime.datetime.now()  # 当前时间
    run_date_time = datetime.datetime(now_date_time.year, now_date_time.month, now_date_time.day,
                                      now_date_time.hour,
                                      now_date_time.minute, 59)  # 运行的时间
    start_date_time = now_date_time.strftime('%Y-%m-%d %H:%M:%S')  # 开始的时间
    end_date_time = (now_date_time + datetime.timedelta(seconds=59)).strftime('%Y-%m-%d %H:%M:%S')  # 结束的时间

    # ############创建job

    # 一到某个时间就做某件事情
    # scheduler.add_job(job, trigger='date', id="home_new.Ui_Home_Window.sendServerMsg",
    #                   run_date=run_date_time, replace_existing=True, args=["一到某个时间"])

    trigger_args = {
        "seconds": 2
        # , "minutes": None
        # , "hours": None
        # , "days": None
    }
    # 在某个时间范围内做某件事情
    # scheduler.add_job(job, trigger='interval', id="home_new.Ui_Home_Window.sendServerMsg2", **trigger_args,
    #                   start_date=start_date_time,
    #                   end_date=end_date_time, replace_existing=True,
    #                   args=["在某个时间范围内"])

    # 每隔多长时间做某件事情
    # #以模块为导向的指定job func的形式
    scheduler.add_job("component.schedulerComponent:Test.test", trigger='interval', id="schedulerComponent.service.3",
                      replace_existing=True,**trigger_args, args=["每隔多长时间"])  # 创建或替换job
    # scheduler.add_job(job, trigger='interval', id="schedulerComponent.service.3_1", replace_existing=True,
    #                   **trigger_args, args=["每隔多长时间_1"])  # 创建或替换job

    # #代码中可以直接写注解,例如以下,但是需要先显示获取scheduler对象
    # @scheduler.scheduled_job('interval', id='my_job_id', hours=2)
    # def my_job():
    #     print("my_job")

    # ############修改job
    # 方式一: 先删除再添加
    # scheduler.pause_job(job1_id)
    # scheduler.remove_job(job1_id)  # 删除job
    # scheduler.add_job(job, 'date', id=job1_id, run_date=run_date_time, replace_existing=True, args=["一到某个时间"])
    # 方式二: 直接修改trigger
    # scheduler.reschedule_job(job1_id, trigger='date',
    #                          run_date=run_date_time)

    # ############删除job
    # scheduler.remove_job("service_1")  # 删除job
    # jobs = scheduler.get_jobs()  # 删除所有job
    # for job_data in jobs:
    #     scheduler.remove_job(job_data.id)

    # ############查询job
    # jobs = scheduler.get_jobs()  # 查询所有job
    # print(jobs.__str__())
    # job1_obj = scheduler.get_job("service_1")  # 根据id查询单个job
    # print("service_1", job1_obj)


if __name__ == '__main__':
    print(datetime.datetime.now().__str__())
    # example()
    time.sleep(4444)

猜你喜欢

转载自blog.csdn.net/qq_35559756/article/details/84058397