详情及源码
最近团长工作中收到了后台回归测试的任务,火急火燎的写了几个主要流程的接口自动化回归脚本。刚开始想着就几个接口那就直接调用吧,写出来是这样的:
class ops_ris():
def __init__(self,env,phase_no='IG_TELCHECK',risk_desc='AGENCY'):
self.env=env
self.phase_no=phase_no
self.risk_desc=risk_desc
def investigation(self):
"""调查处理无风险经办结案"""
#转派
self.transfercase()
#查询最近一笔xiejinggeng名下经办待处理案件号
# sql = "SELECT case_no FROM ig_case where handle_user='xiejinggeng' and state in ('TCED','TCW') ORDER BY id desc limit 1"
# self.case_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["case_no"]
# 查询最近一笔case_no案件号IG_TELCHECK阶段对应的task_no
sql=f"select task_no from wf_task where biz_no='{self.case_no}' and phase_no='{self.phase_no}' order by id desc limit 1"
task_no=DBmananger(self.env, 'ris').callMysql(sql)[0]["task_no"]
#登录获取sid
self.sid=get_sid(self.env)
#调查经办结案
method,bizContent=invest_first(self.case_no,task_no)
self.header=headers(self.sid)
self.url=envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json', method='post',headers=self.header)
if res['flag'] =="S":
logger.debug(f"案件{self.case_no}已经办处理成功")
else:
logger.error(f"案件{self.case_no}经办处理失败,失败原因:{res['msg']}")
test_name = ["中介类型调查经办处理", "盗用类型调查经办处理", "伪冒类型调查经办处理", "其他类型调查经办处理"]
if self.risk_desc == "AGENCY":
test_Name = test_name[0]
elif self.risk_desc == "THEFT":
test_Name = test_name[1]
elif self.risk_desc == "FAKE":
test_Name = test_name[2]
else:
test_Name = test_name[3]
return {
"测试用例名称":test_Name,
"测试结果":"pass" if res['flag'] =="S" else "fail",
"原因":res["msg"],
"url":self.url,
"res": res,
"param":GetCommonContent(method, bizContent)
}
def transfercase(self):
"""调查案件案件转派"""
sql=f"select case_no from ig_case where risk_desc='{self.risk_desc}' and state in ('TCED','TCW') ORDER BY id desc limit 1"
self.case_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["case_no"]
sql=f"select task_no,biz_no from wf_task where phase_no='{self.phase_no}' and biz_no='{self.case_no}' ORDER BY id desc limit 1"
task_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["task_no"]
self.case_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["biz_no"]
method, bizContent = transfer(self.case_no, task_no)
# 登录获取sid
self.sid = get_sid(self.env)
self.header = headers(self.sid)
self.url = envhost[self.env]
self.res=apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
# 调查转派
if self.res['flag'] =="S":
logger.debug(f"案件{self.case_no}已经转派成功")
else:
logger.error(f"案件{self.case_no}经办转派失败,失败原因:{res['msg']}")
return {
"测试用例名称": "调查案子转派",
"测试结果": "pass" if self.res['flag'] == "S" else "fail",
"原因": self.res["msg"],
"url": self.url,
"res": self.res,
"param": GetCommonContent(method, bizContent)
}
def review(self):
"""调查案件复核"""
test_name = ["中介类型调查复核处理", "盗用类型调查复核处理", "伪冒类型调查复核处理", "其他类型调查复核处理"]
if self.risk_desc == "AGENCY":
test_Name = test_name[0]
elif self.risk_desc == "THEFT":
test_Name = test_name[1]
elif self.risk_desc == "FAKE":
test_Name = test_name[2]
else:
test_Name = test_name[3]
if self.res['flag'] == "F":
return {
"测试用例名称": test_Name,
"测试结果": "skip" ,
"原因": "经办失败跳过复核",
"url": self.url,
"res": '',
"param": ''
}
else:
#检测案件状态,结案直接pass 复核阶段调复核处理
sql=f"select state from ig_case where case_no='{self.case_no}' "
state = DBmananger(self.env, 'ris').callMysql(sql)[0]["state"]
if state=="PS":
logger.debug(f"案件{self.case_no}已经结案")
else:
sql=f"select task_no,state from wf_task where phase_no='IG_LASTINSTANCE' and biz_no='{self.case_no}' ORDER BY id desc limit 1"
task_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["task_no"]
sql=f"select remark from m_opinion where case_no='{self.case_no}' and phase_no='IG_TELCHECK'"
remark=DBmananger(self.env, 'ris').callMysql(sql)[0]["remark"]
method, bizContent = transfer(self.case_no, task_no)
# 登录获取sid
self.sid = get_sid(self.env)
self.header = headers(self.sid)
self.url = envhost[self.env]
apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
method, bizContent = inreview(self.case_no, task_no,remark)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json', method='post', headers=self.header)
# 调查复核结案
if res['flag'] == "S":
logger.debug(f"案件{self.case_no}结案成功")
else:
#再处理一次复核,有缓存
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json', method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.case_no}结案成功")
else:
logger.debug(f"案件{self.case_no}结案失败,原因:{res['msg']}")
# test_name = ["中介类型调查复核处理", "盗用类型调查复核处理", "伪冒类型调查复核处理", "其他类型调查复核处理"]
# if self.risk_desc == "AGENCY":
# test_Name = test_name[0]
# elif self.risk_desc == "THEFT":
# test_Name = test_name[1]
# elif self.risk_desc == "FAKE":
# test_Name = test_name[2]
# else:
# test_Name = test_name[3]
return {
"测试用例名称": test_Name,
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
class tel():
def __init__(self,env,investigate="Y",investigateType1="N",investigateType2="N",investigateType3="Y",investigateType4="N",biz_no=''):
self.env = env
#调查类型:investigateType1-中介,investigateType2-盗用,investigateType3-伪冒,investigateType4-其他
self.investigate=investigate
self.investigateType1=investigateType1
self.investigateType2 = investigateType2
self.investigateType3 = investigateType3
self.investigateType4 = investigateType4
self.biz_no=biz_no
def transfercase(self):
"""电核案件转派"""
#倒序查xiejinggeng名下电核最新一笔电核案子
if self.biz_no=='':
sql = "select user_no,task_no,biz_no from wf_task where phase_no='AP_TELCHECK' and switch_flag='SW_AP_360_JIE' and state='DISPATHED' ORDER BY id DESC limit 1"
else:
sql=f"select user_no,task_no,biz_no from wf_task where biz_no ='{self.biz_no}' and phase_no='AP_TELCHECK' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
if self.user_no=="xiejinggeng":
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no,self.task_no,'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
elif self.user_no=="genggeng":
# 登录获取sid
self.sid = get_sid(self.env,login_name='genggeng')
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'xiejinggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
else:
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
return {
"测试用例名称": "电核案件转派",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def tel_check(self):
"""电话核查"""
#查下案子在谁名下
sql=f"select user_no,task_no,biz_no from wf_task where phase_no='AP_TELCHECK' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
# 登录获取sid
self.sid = get_sid(self.env,login_name=self.user_no)
method, bizContent = tel_check(self.biz_no,self.task_no,self.investigateType1,self.investigateType2,self.investigateType3,self.investigateType4,self.investigate)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
#investigateType1-中介,investigateType2-盗用,investigateType3-伪冒,investigateType4-其他
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}电核成功")
else:
logger.debug(f"案件{self.biz_no}电核失败,原因:{res['msg']}")
test_name=["电核转中介类型调查","电核转盗用类型调查","电核转伪冒类型调查","电核转其他类型调查"]
if self.investigateType1=="Y":
test_Name=test_name[0]
elif self.investigateType2=="Y":
test_Name=test_name[1]
elif self.investigateType3=="Y":
test_Name=test_name[2]
else:
test_Name = test_name[3]
return {
"测试用例名称": test_Name,
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def final_jud(self):
"""终审"""
#先转派
sql = f"select user_no,task_no,biz_no from wf_task where biz_no ='{self.biz_no}' and phase_no='AP_LASTINSTANCE' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
if self.user_no=="xiejinggeng":
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no,self.task_no,'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
elif self.user_no=="genggeng":
# 登录获取sid
self.sid = get_sid(self.env,login_name='genggeng')
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'xiejinggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
else:
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_tel(self.biz_no, self.task_no, 'genggeng')
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent), parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}转派成功")
else:
logger.debug(f"案件{self.biz_no}转派失败,原因:{res['msg']}")
#终审
# 查下案子在谁名下
sql = f"select user_no,task_no,biz_no from wf_task where biz_no ='{self.biz_no}' and phase_no='AP_LASTINSTANCE' ORDER BY id DESC limit 1"
self.user_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["user_no"]
self.task_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["task_no"]
self.biz_no = DBmananger(self.env, 'apv_wf').callMysql(sql)[0]["biz_no"]
# 登录获取sid
self.sid = get_sid(self.env, login_name=self.user_no)
method, bizContent = fina_check(self.biz_no,self.task_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.biz_no}终审成功")
else:
logger.debug(f"案件{self.biz_no}终审失败,原因:{res['msg']}")
test_name = ["终审转中介类型调查", "终审转盗用类型调查", "终审转伪冒类型调查", "终审转其他类型调查"]
if self.investigateType1 == "Y":
test_Name = test_name[0]
elif self.investigateType2 == "Y":
test_Name = test_name[1]
elif self.investigateType3 == "Y":
test_Name = test_name[2]
else:
test_Name = test_name[3]
return {
"测试用例名称": test_Name,
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
class Inquisition():
def __init__(self,env):
self.env=env
def Inquisitiontransfer(self):
"""征审质检转派"""
#查询最新待质检案子
sql="select appl_no from qc_case where source_channel='APV' and state='W_QC' and case_type='A24' order by id desc limit 1"
self.appl_no = DBmananger(self.env, 'ris').callMysql(sql)[0]["appl_no"]
#转派
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = transfer_opinion(self.appl_no,"xiejinggeng")
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.appl_no}征审质检转派给xiejinggeng成功")
else:
logger.debug(f"案件{self.appl_no}征审质检转派给xiejinggeng失败,原因:{res['msg']}")
return {
"测试用例名称": "征审质检案件转派",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def Inquisition_qc_case(self):
'''征审质检'''
# 登录获取sid
self.sid = get_sid(self.env)
method, bizContent = opinion(self.appl_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.appl_no}征审质检成功")
else:
logger.debug(f"案件{self.appl_no}征审质检失败,原因:{res['msg']}")
return {
"测试用例名称": "征审质检",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def Inquisition_feedback(self):
'''征审质检争议处理'''
# 登录获取sid
sql=f"select tel_check_user from qc_case where appl_no ={self.appl_no}"
self.tel_check_user = DBmananger(self.env, 'ris').callMysql(sql)[0]["tel_check_user"]
self.sid = get_sid(self.env,login_name=self.tel_check_user)
method, bizContent = feedback(self.appl_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
if res['flag'] == "S":
logger.debug(f"案件{self.appl_no}征审质检争议处理成功")
else:
logger.debug(f"案件{self.appl_no}征审质检争议处理失败,原因:{res['msg']}")
return {
"测试用例名称": "征审质检争议处理",
"测试结果": "pass" if res['flag'] == "S" else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def Inquisition_opinionend(self):
'''征审质检定案'''
# 登录获取sid
self.sid = get_sid(self.env, login_name=self.tel_check_user)
method, bizContent = opinionend(self.appl_no)
self.header = headers(self.sid)
self.url = envhost[self.env]
res = apiRequests.callInterface(url=self.url, param=GetCommonContent(method, bizContent),
parammode='json',
method='post', headers=self.header)
sql=f"select state from qc_case where appl_no ={self.appl_no}"
self.state = DBmananger(self.env, 'ris').callMysql(sql)[0]["state"]
if res['flag'] == "S" and self.state=='VDT':
logger.debug(f"案件{self.appl_no}征审质检定案成功")
else:
logger.debug(f"案件{self.appl_no}征审质检定案失败,原因:{res['msg']},案件状态:{self.state}")
return {
"测试用例名称": "征审质检定案",
"测试结果": "pass" if res['flag'] == "S" and self.state=='VDT' else "fail",
"原因": res["msg"],
"url": self.url,
"res": res,
"param": GetCommonContent(method, bizContent)
}
def nowtime():
now_time = datetime.datetime.now()
now = datetime.datetime.strftime(now_time, '%Y-%m-%d %H:%M:%S')
# date_now=datetime.datetime.strftime(now_time, '%Y-%m-%d')
date = (int(time.mktime(time.strptime(now, "%Y-%m-%d %H:%M:%S"))))
return now, date
把接口数据直接写到程序里面了,运行起来很爽。不过写过的都知道维护会很麻烦,接口要是改动了要改代码,也不好扩展,还是数据分离吧,借鉴httprunner的方式,将用例放到yaml维护吧(放到excle也可以,之前有些过这种感兴趣的可以翻阅这个文档添加链接描述)
格式如下:
- CaseNo: 1
Extract:
biz_no:
sql-apv_wf: select ***
taskNo:
sql-apv_wf: select ***
user: 'genggeng'
headers:
fun: get_sid(login_name='xiejinggeng')
CaseNmae: 电核转派
Url: ''
Method: post
param:
method: qihoo.apv.ma.caseturn.update
bizContent:
taskNoList:
- key: $biz_no$
value: $taskNo$
productCode: 360借条
toUser: $user$
headers:
Cookie: $headers$
介绍:
Extract这个key里面的是变量相关的,支持sql操作、自定义函数,可通过正则区分sql还是自定义函数,如:sql-开头判断是数据库操作的,后面接操作的库名apv_wf;fun则是自定义函数,到置顶目录去查找并执行get_sid(login_name=‘xiejinggeng’)。变量用 变 量 变量 变量包括起来,然后替换。如下
from quality_management_logic.ops_new.PublicCenter.Case_Get_Data import *
from main import BASE_DIR
from quality_management_logic.commonCenter.logUtil import getlog
from quality_management_logic.ops.publicCenter import apiRequests
from quality_management_logic.ops.dataCenter.env import envhost
logger = getlog(targetName='RunCase')
#yaml文件list
casepath=f"{BASE_DIR}/TestCaseCenter/"
casefilelist=File_Name(casepath)
def Run(env):
'''
执行用例的入口
:return:
'''
#读yaml用例文件转换成json格式数据
for yamlfile in casefilelist:
caselist=yaml_r(yamlfile)
logger.debug(f"获取用例list:{caselist}")
#用例中用到的sql、函数预执行替换成新用例
for case in caselist:
#存在变量就替换,反之直接执行用例
if 'Extract' in case.keys():
logger.debug(f"用例:{case.get('CaseNo')}存在变量:{case.get('Extract').keys()}")
for key in case.get('Extract').keys():
extract=to_extract(key)
#是sql
if GetSql(case.get('Extract')[key],env=env):
extract_value= GetSql(case.get('Extract')[key],env=env)
#是自定义函数
elif GetFun(case.get('Extract')[key],pattern='fun'):
extract_value = GetFun(case.get('Extract')[key],pattern='fun')
#是常量
else:
extract_value=case.get('Extract')[key]
#替换
# update_allvalues(caselist, key, extract_value)
update_allvalue(caselist, extract, extract_value)
logger.debug(f"获取用例list:{caselist}")
#开始执行替换后的测试用例
for case in caselist:
url=envhost[env]
param=case["param"]
headers=case["headers"]
res = apiRequests.callInterface(url=url, param=param, parammode='json', method='post', headers=headers)
if __name__=='__main__':
print(Run(env='stg1'))
获取sql、公共函数、预期结果等公共函数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/11/25 11:05
# @Author : zhouwang
# @Site :
# @File : Case_Get_Data.py
# @Software: PyCharm
import re
from quality_management_logic.commonCenter.logUtil import getlog
from quality_management_logic.commonCenter.DataManangerl import DBmananger
from quality_management_logic.ops_new.Public import *
import yaml
import os
logger = getlog(targetName='Case_Get_Data')
def GetSql(dic,pattern='sql-',env='stg2'):
'''
获取用例中的sql
:param dic:
:param pattern:
:param env:
:return:
'''
if isinstance(dic,dict):
if re.match(pattern,str(list(dic.keys())[0]),re.I):
logger.debug('有查数据库的变量,开始获取sql')
try:
sql=list(dic.values())[0]
dbname=list(dic.keys())[0].split('-')[1]
return list(DBmananger(env, dbname).callMysql(sql)[0].values())[0]
except Exception as e:
logger.error(e)
return ''
else:
return False
def GetExpectedResults(dic):
'''
获取预期结果值,dict的value必须是list,有且两个值,第一个是位置,第二个是值
:param dic:
:return:
'''
ExpectedResults = dic.get("ExpectedResults")
if isinstance(ExpectedResults, list): # 如是list
if len(ExpectedResults)==2:
return ExpectedResults
else:
return ['res', '']
def GetFun(dic,pattern='fun'):
'''
获取用例中的自定义函数
:param dic:
:param pattern:
:return:
'''
if isinstance(dic, dict):
if re.match(pattern,str(list(dic.keys())[0]),re.I):
logger.debug('有调用公共函数,开始获取调用函数')
return eval(list(dic.values())[0])
else:
return False
def yaml_r(yamlpath):
'''
yaml文件转换成json
:param yamlpath:
:return:
'''
f = open(yamlpath, 'r', encoding='utf-8')
cfg = f.read()
d = yaml.load(cfg,Loader=yaml.FullLoader) # 用load方法转字典
return d
def File_Name(file_dir):
L = []
for root, dirs, files in os.walk(file_dir):
for file in files:
if os.path.splitext(file)[1] == '.yaml':
L.append(os.path.join(root, file))
return L
def to_extract(key):
return f"${key}$"
# 遍历嵌套字典或list并替换字典的key
def update_allvalues(mydict,key,value):
if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典
if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
mydict[key] = value
for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key
chdict = mydict[k]
update_allvalues(chdict,key,value)
mydict[k] = chdict
elif isinstance(mydict, list): #如是list
for element in mydict: #遍历list元素,以下重复上面的操作
if isinstance(element, dict):
if key in element.keys():
element[key] = value
for k in element.keys():
chdict = element[k]
update_allvalues(chdict,key,value)
element[k] = chdict
# 遍历嵌套字典或list并替换字典的value
def update_allvalue(mydict, value, tovalue):
if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典
# if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
# if value in mydict.values():
for key in mydict.keys():
if mydict[key]==value:
mydict[key] = tovalue
# for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key
chdict = mydict[key]
update_allvalue(chdict, value, tovalue)
if mydict[key] == value:
mydict[key] = tovalue
elif isinstance(mydict, list): # 如是list
for element in mydict: # 遍历list元素,以下重复上面的操作
if isinstance(element, dict):
# if value in element.values():
for key in element.keys():
if element[key] == value:
element[key] = tovalue
# for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key
chdict = element[key]
update_allvalue(chdict, value, tovalue)
if element[key] == value:
element[key] = tovalue
大功告成