Flask_APScheduler插件封装业务逻辑持久化任务(二)
1.概述
定时任务Flask_APScheduler插件结合业务使用时,需要将任务持久化到数据库中与业务逻辑结合起来使用才能体现定时任务的价值。
定时任务Flask_APScheduler插件本身支持市面上主流的ORM框架将任务信息持久化到数据库中与业务结合,但是如果我们的项目是自己封装的ORM,就不能直接将任务信息持久化到数据库。
这篇文章就是介绍如何封装定时任务信息持久化到数据库,与业务逻辑结合实现定时任务。
Flask_APScheduler是一个系列文章,第一篇文章介绍了该组件的基本使用,链接如下。
Flask_APScheduler集成Flask项目系列入门篇(一):https://brucelong.blog.csdn.net/article/details/129127132
2.封装Flask_APScheduler插件功能
2.1.封装Flask_APScheduler插件功能
首先对Flask_APScheduler插件做一层封装使他的功能符合我们业务逻辑需求。
在项目中创建一个ext包,在该包中创建task_apscheduler.py
文件封装任务的功能。
task_apscheduler.py
文件代码包含两个作用
- 封装apscheduler添加任务、编辑任务、查询任务、暂停任务、删除任务等功能
- 初始化apscheduler任务状态,每次重启项目后APScheduler中的任务就会从内存中删除,导致APScheduler中的任务和数据库任务状态不一致,所以项目启动时将数据库中任务状态初始化为暂停状态
from apscheduler.schedulers.background import BackgroundScheduler
from flask_apscheduler import APScheduler
from loguru import logger
from src.app.entity.light_task_info import light_task_info
from src.app.mapper.base_mapper import BaseMapper
scheduler = APScheduler(BackgroundScheduler(timezone='Asia/Shanghai', api_enabled=True))
# 获取flask对象,注册scheduler
def aspscheduler_register(app):
scheduler.init_app(app)
scheduler.start()
Init_job().init_jobs_pause()
def cron_pattern(expression):
''' 将cron表达式转换为 ApScheduler接收的字典格式
:param expression: cron表达式
:return: 字典格式cron表达式
'''
args = {
}
if expression is None:
return args
# 以空格为分隔符拆分字符串输出列表,拆分结果 ['0/2', '*', '*', '*', '*', '?']
expression = expression.split(' ')
if expression[0] != '?':
args['second'] = expression[0]
if expression[1] != '?':
args['minute'] = expression[1]
if expression[2] != '?':
args['hour'] = expression[2]
if expression[3] != '?':
args['day'] = expression[3]
if expression[4] != '?':
args['month'] = expression[4]
if expression[5] != '?':
args['day_of_week'] = expression[5]
return args
def add_job(func=None, args=None, kwargs=None, id=None, name=None, trigger='cron', coalesce=False, cron=None):
'''将需要运行的函数添加到任务存储器中,并启动任务。
:param func: 待运行的函数
:param args: 函数参数
:param kwargs: 函数参数
:param id: 任务id
:param name: 任务名称
:param trigger: 触发器类型
:param coalesce: 是否合并任务运行
:param cron: 任务运行计划表达式
:return:
'''
scheduler.add_job(func=func, args=args, kwargs=kwargs, id=str(id), name=name, trigger=trigger,
coalesce=coalesce, **cron_pattern(cron))
def del_job(job_id):
''' 删除ApScheduler存储器中已存在的任务
:param job_id: 任务id
:return:
'''
scheduler.remove_job(str(job_id))
def modify_job(func=None, args=None, kwargs=None, id=None, name=None, trigger='cron', coalesce=False,
cron=None):
''' 删除已存在的任务,然后使用已删除的任务id创建新任务,实现修改任务功能
:param func: 待运行的函数
:param args: 函数参数
:param kwargs: 函数参数
:param id: 任务id
:param name: 任务名称
:param trigger: 触发器类型
:param coalesce: 是否合并任务运行
:param cron: 任务运行计划表达式
:return:
'''
del_job(id)
add_job(func=func, args=args, kwargs=kwargs, id=str(id), name=name, trigger=trigger, coalesce=coalesce,
**cron_pattern(cron))
def query_job_id(job_id):
''' 根据id查询任务信息
:param job_id: 任务id
:return: 任务信息
'''
return scheduler.get_job(str(job_id))
def query_job_all():
''' 查询所有任务信息
:return:任务信息列表
'''
return scheduler.get_jobs()
def pause_job(job_id):
''' 暂停ApScheduler存储器中已存在的任务,返回任务状态
:param job_id: 任务id
:return: 返回任务状态
'''
job_info = scheduler.pause_job(str(job_id))
return get_job_status(job_info)
def start_job(job_id):
''' 启动ApScheduler存储器中已存在且暂停的任务,返回任务状态
:param job_id: 任务id
:return: 返回任务状态
'''
job_info = scheduler.resume_job(str(job_id))
return get_job_status(job_info)
def get_job_status(job_data):
''' 从job信息中提炼出状态值
:param job_data: 任务信息
:return: 任务状态
'''
return str(job_data).split(",")[-1].strip(")")
def check_cron_pattern(cron_expression):
if len(cron_expression.strip().split(' ')) != 6:
raise 'cron格式错误'
class Init_job(BaseMapper):
def init_jobs_pause(self):
'''每次重启项目后APScheduler中的任务就会从内存中删除,导致APScheduler中的任务和数据库任务状态不一致,
所以项目启动时将数据库中任务状态初始化为暂停状态
:return:
'''
try:
if (tasks := self.query_started_job()) is not None:
for task in tasks:
self.set_pause_task(task)
else:
logger.info('数据库中没有已启动定时任务,不需要初始化任务')
except Exception as e:
logger.error('初始化定时任务状态失败信息:', e)
def query_started_job(self):
return self.select(light_task_info(), condition={
'task_status': 1})
def set_pause_task(self, task):
try:
res = self.update(light_task_info({
'task_status': 0}), {
'id': task.get('id')})
if isinstance(res, int):
logger.info(f'初始化定时任务状态成功')
return res
except:
raise
2.2.apscheduler注册到flask
在ext
包的__init__.py
文件中获取falsk,并注册apscheduler
def init_app(app):
from . import (
task_aspscheduler
)
task_aspscheduler.aspscheduler_register(app)
在app
包的__init__.py
文件中将ext
注册到falsek
from . import settings
from flask import Flask
from config import config
def create_app(config_name):
from .api import (
)
# 导入ext
from . import ext
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 注册ext到falsk
ext.init_app(app, config_name)
return app
3.定时任务与业务逻辑结合
根据业务逻辑调用上面封装好的apscheduler函数实现定时任务功能,下面是业务逻辑的server
层业务代码,它的开发逻辑如下。
- 1.创建任务时,将任务信息添加到数据库中,任务信息包含
任务id
、任务名称name
、任务定时表达式cron
、任务状态task_status
、待运行用例case_id
、用例运行环境base_url
、默认任务状态为暂停。此时不调用Apscheduler创建任务。 - 2.只有点击运行
startTask
运行任务时,从数据库中查询任务信息,将任务信息添加到apscheduler中,运行任务。 - 3.编辑任务时,调用
task_apscheduler.py
文件删除任务函数,从apscheduler中删除该任务。然后更新数据库中的任务信息。 - 4.暂停任务时,调用
task_apscheduler.py
文件暂停任务函数,暂停apscheduler中该任务。同时更新数据库中任务状态为暂停。 - 5.启动任务,调用
task_apscheduler.py
文件启动任务函数,启动apscheduler中该任务。同时更新数据库中任务状态为启动。 - 6.删除任务,调用
task_apscheduler.py
文件删除任务函数,从apscheduler中删除该任务。然后删除数据库中的任务信息。
TaskInfoService类参数说明
- TaskInfoService继承BaseMapper, BaseMapper是自己封装的ORM框架操作数据库的增删改查。
- dto_task封装前端请求接口的json数据
from loguru import logger
from src.app.common.err_status import ErrAddTask, Success, ErrRemoveTask, ErrUpdateTask, ErrNoneTask, ErrStartTask, \
ErrPauseTask, ErrQueryTask, ErrNoStartTask
from src.app.entity.light_task_info import light_task_info
from src.app.mapper.base_mapper import BaseMapper
from src.app.ext.task_aspscheduler import add_job, del_job, query_job_id, start_job, pause_job
from src.app.service.base_service.service_util import is_exist_data, json_to_object
class TaskInfoService(BaseMapper):
# 向数据库中新增任务信息
def addTask(self, dto_task):
try:
res = self.insert(dto_task)
if "DBERROR" in res:
return ErrAddTask.resData()
return Success.resData({
'taskId': res})
except Exception as e:
logger.error(f'新增任务失败,打印异常信息:{
e}')
return ErrAddTask.resData()
def removeTask(self, dto_task):
try:
if query_job_id(dto_task.dto['id']) is not None:
del_job(dto_task.dto['id'])
except Exception as e:
logger.error("删除APSchudeler job 异常:" + str(e))
try:
res = self.delete(dto_task)
if 1 != res:
return ErrRemoveTask.resData()
return Success.resData(res)
except Exception as e:
return ErrRemoveTask.resData(e)
def updateTask(self, dto_task):
# 1.从scheduler中删除当前任务
# 2.修改数据库中任务信息,将任务状态设置为暂停。每次修改任务信息后必须手动启动任务。
try:
if query_job_id(dto_task.dto['id']) is not None:
del_job(dto_task.dto['id'])
except Exception as e:
logger.error("修改 APSchudeler job 异常:" + str(e))
try:
condict = {
'id': dto_task.dto.get('id')}
# 设置任务状态为暂停
dto_task.dto['task_status'] = 0
del dto_task.dto["id"]
res = self.update(dto_task, condict)
if not isinstance(res, int):
return ErrUpdateTask.resData()
return Success.resData({
"updataCount": res})
except Exception as e:
return ErrUpdateTask.resData(e)
def startTask(self, dto_task):
''' 为了防止启动的任务在ApScheduler中不存在启动失败,首先将任务添加到AP中,如果已经存在则直接启动。
:param dto_task:
:return:
'''
# 查看数据库任务不存在结束流程
try:
if not is_exist_data({
'id': dto_task.dto['id']}, light_task_info()):
return ErrNoneTask.resData()
except Exception:
return ErrNoneTask.resData()
try:
# 查看apscheduler中不存在当前任务,将任务添加到ap中启动,然后修改数据库中任务状态为启动。
if query_job_id(dto_task.dto['id']) is None:
create_job_apscheduler(self.queryOneTaskById(dto_task, dto_task.dto['id']))
res = self.settask_status(dto_task)
return Success.resData(res)
# ap中存在当前任务,直接启动任务。然后修改数据库中任务状态为启动
else:
start_job(dto_task.dto['id'])
res = self.settask_status(dto_task)
return Success.resData(res)
except Exception as e:
return ErrStartTask.resData(e)
def pauseTask(self, dto_task):
try:
if query_job_id(dto_task.dto['id']) is not None:
pause_job(dto_task.dto['id'])
res = self.settask_status(dto_task)
return Success.resData(res)
else:
return ErrNoStartTask.resData()
except Exception as e:
logger.error("暂停APSchudeler job 异常:" + str(e))
return ErrPauseTask.resData(e)
def settask_status(self, dto_task):
try:
res = self.update(dto_task, {
'id': dto_task.dto.get('id')})
if isinstance(res, int):
return res
except:
raise
def queryTask(self, dto_task):
try:
res = self.select(dto_task, condition=dto_task.dto)
list_res = json_to_object(res)
if res == "DBERROR":
return ErrQueryTask.resData()
if not res:
return Success.resData({
"task": res, "total": 0})
total = self.count_select_row(dto_task, condition=dto_task.dto)
return Success.resData({
"task": list_res, "total": total})
except Exception as e:
return ErrQueryTask.resData(e)
def queryOneTaskById(self, dto_task, id):
try:
return self.select(dto_task, condition={
'id': id})[0]
except Exception as e:
return ErrQueryTask.resData(e)
def create_job_apscheduler(task_info):
''' 添加任务到APSchuduler
:param task_info: 任务信息
'''
task_dict = {
'id': task_info.get('id'),
'name': task_info.get('name'),
'cron': task_info.get('cron'),
'kwargs': {
'project_id': task_info.get('project_id'), 'case_id': task_info.get('case_id')}
}
try:
add_job(func=job, **task_dict)
except Exception as e:
logger.error(f"添加任务到APSchudeler异常:{
e}")
# 自动化运行测试任务
def job(**req):
logger.warning(f"定时任务运行中,参数是{
req['project_id']}、{
req['case_id']}")