APScheduler和django_apscheduler实践心得

一、APScheduler

  • 简介

APScheduler(Advanced Python Scheduler)是一个轻量级的Python定时任务调度框架(Python库)。

APScheduler有三个内置的调度系统,其中包括:

  1. cron式调度(可选开始/结束时间)
  2. 基于间隔的执行(以偶数间隔运行作业,也可以选择开始/结束时间)
  3. 一次性延迟执行任务(在指定的日期/时间内运行作业一次)
  • 安装

$ pip install apscheduler
  • 四大组件

  1. 触发器 triggers :用于设定触发任务的条件
  2. 任务储存器 job stores:用于存放任务,把任务存放在内存或数据库中
  3. 执行器 executors: 用于执行任务,可以设定执行模式为单线程或线程池
  4. 调度器 schedulers: 把上方三个组件作为参数,通过创建调度器实例来运行
  • 触发器

每一个任务都有自己的触发器,触发器用于决定任务下次运行的时间。

  1. date 在指定时间点触发任务

  2. interval 周期触发任务

  3. cron 强大的类crontab表达式

  • 调度器

一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。

  1. BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。
  2. BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
  3. AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。
  4. GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
  5. TornadoScheduler Tornado调度器,适用于构建Tornado应用。
  6. TwistedScheduler Twisted调度器,适用于构建Twisted应用。
  7. QtScheduler Qt调度器,适用于构建Qt应用。

关于APScheduler就不细讲太多,想了解更多请参考 APScheduler官方文档翻译

 二、django_apscheduler

  • 安装

$ pip install django-apscheduler 
  • 配置

1.修改settings.py文件,在INSTALLED_APPS中加入django-apscheduler应用:

INSTALLED_APPS = [
    ......
    'django_apscheduler',#定时执行任务
]

2.执行迁移命令:

python manage.py migrate
  • 使用

from datetime import datetime, timedelta

from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events
from django_apscheduler.models import DjangoJob
from django.conf import settings

# 每个app目录下的tasks.py写定时任务代码
from app.tasks import test_func


# 定时任务配置
SCHEDULER_JOBS = [
    {
        "func": test_func,
        "trigger": "cron",
        "only_debug_false": False,
        "func_args": [],
        "func_kwargs": {},
        "trigger_kwargs": {
            "second": '*/10'  # 每10秒执行一次
        }
    },
]


scheduler = BackgroundScheduler({
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '5'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
})
DjangoJob.objects.filter(next_run_time__lt=datetime.now()).delete()
# print('scheduler', id(scheduler))
scheduler.add_jobstore(DjangoJobStore(), "default")
register_events(scheduler)
scheduler.start()


for job in SCHEDULER_JOBS:
    if job["only_debug_false"] and settings.DEBUG:
        continue
    scheduler.add_job(job["func"], trigger=job["trigger"], **job["trigger_kwargs"], id=job["func"].__name__,
                      args=job["func_args"], kwargs=job["func_kwargs"], replace_existing=True)


# 以下为任务队列,可在视图中添加定时任务
def add_tasks(func, *args, delay=0, **kwargs):
    run_date = datetime.now() + timedelta(seconds=delay)
    scheduler.add_job(func, trigger="date", run_date=run_date, args=args, kwargs=kwargs,
                      id="%s_%s" % (func.__name__, run_date.strftime("%Y%m%d_%H%M%S")))


def add_cron_tasks(func, task_id, args=(), kwargs={}, **trigger_kwargs):
    scheduler.add_job(func, trigger='cron', id=task_id, args=args, kwargs=kwargs, **trigger_kwargs)
  •  自定义监听

当你的项目存在高频的重复任务,这些任务可能不想记录执行过程,而想记录错误信息;但是其他任务就是二者都要记录,这时可能就需要自定义监听

扫描二维码关注公众号,回复: 11927245 查看本文章
import logging


class CommonListen(object):
    excute_log = logging.getLogger("info_file")
    error_log = logging.getLogger("django.request")

    @classmethod
    def listen(cls, func, *args, **kwargs):
        try:
            if cls.excute_log:
                cls.excute_log.info('[任务{func_name}]执行'.format(func_name=func.__name__))
            return func(*args, **kwargs)
        except Exception as e:
            if cls.error_log:
                cls.error_log.error('[任务{func_name}]执行失败,失败原因:\n{err_msg}'.format(
                    func_name=func.__name__, err_msg=e))


def add_tasks(func, *args, delay=0, **kwargs):
    run_date = datetime.now() + timedelta(seconds=delay)
    kwargs.update(func=func)
    scheduler.add_job(CommonListen.listen, trigger="date", run_date=run_date, args=args, kwargs=kwargs,
                      id="%s_%s" % (func.__name__, run_date.strftime("%Y%m%d_%H%M%S")))

三、关于多进程重复执行任务

修改mysite/wsgi.py

import os

from django.core.wsgi import get_wsgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

application = get_wsgi_application()

__import__('mysite.my_django_apscheduler')  # 在这里添加引入

猜你喜欢

转载自blog.csdn.net/qq_32897527/article/details/105390179