版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/warrah/article/details/84313002
jira相信大多数开发、测试工程师都在知道,功能测试、集成测试等环节少不了它,但是一个个录入bug并不适宜所有场景。比如做数据爬虫,比如代码质量走查,就不好人为直接干涉,于是我决心好好看一下jira,把atlassian-jira
反编译看看他的代码,多的吓人,老技术,真心不想看下去了,于是只好从数据库着手。
全量的数据有这么多
并不是所有数据都有数据,于是我将没有数据的表,一个个删除。当然这些表的数据,是我们使用了5年的jira数据库,删掉之后,从原来186,降到了96张
分析表结构后,编写脚本写入数据库
import datetime
import json
import time
import pandas as pd
from sqlalchemy.engine import create_engine
from . import settings
ENGINE = create_engine('mysql+mysqldb://{}:{}@{}:3306/{}'.format(settings.MYSQL_USER, settings.MYSQL_PASSWD,
settings.MYSQL_HOST, settings.MYSQL_DBNAME),
connect_args={'charset': 'utf8'}, pool_size=settings.MYSQL_POOL_SIZE)
class ProjectService:
'''
项目表
'''
def __init__(self):
pass
def select(self,pkey):
if not pkey:
raise Exception("pkey不能为空")
sql = "select ID, pname from project where pkey='{}'".format(pkey)
df = pd.read_sql(sql,ENGINE)
records = json.loads(df.to_json(orient='records'))
if records:
return records[0]
else:
raise Exception("没有找到对应的项目")
class OsWfentryService:
'''
osworkflow入口
'''
def __init__(self):
self.osCurrentstepService = OsCurrentstepService()
def gen_max_id(self):
sql = "select MAX(ID) ID from os_wfentry"
df = pd.read_sql(sql,ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
raise Exception("jira系统异常")
return results[0]['ID'] +1
def start_workflow(self,):
os_wfentry = {'NAME':'jira', 'STATE':1}
os_wfentry['ID'] = self.gen_max_id()
df = pd.DataFrame([os_wfentry])
df.to_sql('os_wfentry', ENGINE, if_exists='append', index=False)
print('工作流产生成功')
self.osCurrentstepService.gen_current_step(os_wfentry['ID'])
return os_wfentry['ID']
class OsCurrentstepService:
'''
工作流当前步骤
'''
def __init__(self):
pass
def gen_max_id(self):
sql = "select MAX(ID) ID from os_currentstep"
df = pd.read_sql(sql,ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
raise Exception("jira系统异常")
return results[0]['ID'] +1
def gen_current_step(self,entry_id):
os_currentstep = {'STEP_ID':1,'ACTION_ID':0,'STATUS':'Open'}
os_currentstep['ENTRY_ID'] = entry_id
os_currentstep['ID'] = self.gen_max_id()
os_currentstep['START_DATE']=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame([os_currentstep])
df.to_sql('os_currentstep', ENGINE, if_exists='append', index=False)
print('写入工作流当前步骤成功')
class ComponentService:
'''
功能模块
'''
def get_component_id(self,project,cname):
'''
:param project:
:return:
'''
sql = "select ID from component where PROJECT='{}' and cname='{}' ".format(project, cname)
df = pd.read_sql(sql, ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
raise Exception("获取模块异常")
return int(results[0]['ID'])
class ProjectversionService:
def get_version_id(self,project,vname=None):
'''
:param project:
:return:
'''
if vname:
sql = "select ID from projectversion where PROJECT='{}' and vname='{}' ".format(project, vname)
else:
sql = "select ID from projectversion where PROJECT='{}' and SEQUENCE=1 ".format(project)
df = pd.read_sql(sql, ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
raise Exception("获取版本异常")
return int(results[0]['ID'])
class NodeassociationService:
'''
与问题相关的数据,比如问题属于哪个模块
因为问题相关模块是可以自定义的,所以里面的字段可能要根据具体系统进行更改
'''
def __init__(self):
self.componentService = ComponentService()
self.projectversionService = ProjectversionService()
def gen_nodeassociation(self,source_node_id,project,cname,vname):
# 模块
component = {'SOURCE_NODE_ENTITY':'Issue', 'SINK_NODE_ENTITY':'Component','ASSOCIATION_TYPE':'IssueComponent'}
component['SOURCE_NODE_ID']=source_node_id
component['SINK_NODE_ID']=self.componentService.get_component_id(project,cname)
df = pd.DataFrame([component])
df.to_sql('nodeassociation', ENGINE, if_exists='append', index=False)
print('保存问题队对应的模块成功')
# 版本
version = {'SOURCE_NODE_ENTITY':'Issue', 'SINK_NODE_ENTITY':'Version','ASSOCIATION_TYPE':'IssueVersion'}
version['SOURCE_NODE_ID'] = source_node_id
version['SINK_NODE_ID'] = self.projectversionService.get_version_id(project, vname)
df = pd.DataFrame([version])
df.to_sql('nodeassociation', ENGINE, if_exists='append', index=False)
print('保存问题队对应的版本成功')
class UserhistoryitemService:
def gen_max_id(self):
sql = "select MAX(ID) ID from userhistoryitem"
df = pd.read_sql(sql,ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
raise Exception("获取userhistoryitem最大ID异常")
return results[0]['ID'] +1
def gen_userhistoryitem(self,entityid):
userhistoryitem = {'entitytype':'Issue'}
userhistoryitem['USERNAME'] = settings.JIRA_TESTER
userhistoryitem['entityid'] = entityid
userhistoryitem['ID'] = self.gen_max_id()
userhistoryitem['lastviewed'] = int(round(time.time() * 1000))
df = pd.DataFrame([userhistoryitem])
df.to_sql('userhistoryitem', ENGINE, if_exists='append', index=False)
class UserassociationService:
def gen_userassociation(self,sink_node_id):
userassociation = {'SINK_NODE_ENTITY':'Issue','ASSOCIATION_TYPE':'WatchIssue'}
userassociation['SOURCE_NAME'] = settings.JIRA_TESTER
userassociation['CREATED']= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
userassociation['SINK_NODE_ID']=sink_node_id
df = pd.DataFrame([userassociation])
df.to_sql('userassociation', ENGINE, if_exists='append', index=False)
class JiraIssueService:
'''
jira问题表
'''
def __init__(self):
self.projectService = ProjectService()
self.osWfentryService = OsWfentryService()
self.nodeassociationService = NodeassociationService()
self.userhistoryitemService = UserhistoryitemService()
self.userassociationService = UserassociationService()
def gen_max_id(self):
sql = "select MAX(ID) ID from jiraissue"
df = pd.read_sql(sql,ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
raise Exception("获取jira最大ID异常")
return results[0]['ID'] +1
def gen_issuenum(self,project):
sql = "select MAX(issuenum) issuenum from jiraissue where PROJECT ='{}'".format(project)
df = pd.read_sql(sql, ENGINE)
results = json.loads(df.to_json(orient='records'))
if len(results) == 0:
return 1
return results[0]['issuenum'] + 1
def insert(self,pkey,summary,description,assignee,cname,vname):
'''
创建新的jira问题
:param pkey:
:param summary:
:param description:
:return:
'''
project = self.projectService.select(pkey)
# jira的默认配置
# 问题类型:缺陷,优先级:一般,
jiraissue = {'PRIORITY':3,'issuetype':1,'issuestatus':1,'VOTES':0,'WATCHES':1}
# 问题id
jiraissue['ID'] = self.gen_max_id()
# 问题编号
jiraissue['issuenum'] = self.gen_issuenum(project['ID'])
# 工作流
jiraissue['WORKFLOW_ID']=self.osWfentryService.start_workflow()
# 项目
jiraissue['PROJECT'] = project['ID']
# 主体
jiraissue['SUMMARY'] = summary
# 描述
jiraissue['SUMMARY'] = description
# 创建者
jiraissue['CREATOR'] = settings.JIRA_TESTER
# 报告人
jiraissue['REPORTER'] = settings.JIRA_TESTER
# 经办人
if assignee:
jiraissue['ASSIGNEE']=assignee
# 创建时间
jiraissue['CREATED'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
jiraissue['UPDATED'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame([jiraissue])
df.to_sql('jiraissue',ENGINE,if_exists='append',index=False)
# 保存问题对应的模块、版本
self.nodeassociationService.gen_nodeassociation(jiraissue['ID'],project['ID'],cname,vname)
# 保存用户历史记录
self.userhistoryitemService.gen_userhistoryitem(jiraissue['ID'])
# 问题与用户关系
self.userassociationService.gen_userassociation(jiraissue['ID'])
print('写入问题成功')
if __name__ == '__main__':
issueService = JiraIssueService()
pkey = 'AIT'
cname ='测试模块'
vname = 'ver_20181120'
#
summary = 'AI测试9'
description = 'AI测试9'
# 默认经办人,为未分配
assignee='-1'
issueService.insert(pkey,summary,description,assignee,cname,vname)
按照上面的逻辑,虽然有事务和并发的问题,但是能将问题及问题相关的数据均写入到mysql数据库中,但现在的问题是及时数据已经在数据库中了,但是还是无法显示出来,究其原因,应该是做了缓存,因为如果直接改数据库中的内容,查询列表中依旧是查不到的。
既然写脚本操作数据库这条路走不通,可以选择另外一条路,通过爬虫技术,模拟请求创建问题。
创建jira插件,
在上面代码的基础之上,增加登录jira并获取token。下面用到了相对目录,可参考Python包的相对导入时出现错误的解决方法中的解释。
import requests
from ..service.RedisService import RedisService
from .jira_service import ProjectService, ComponentService, ProjectversionService
from . import settings
req_cookies = None
def login():
print('启动登录')
url = settings.JIRA_URL
session = requests.session()
url = settings.JIRA_URL + settings.JIRA_LOGIN
form={'os_username': settings.JIRA_TESTER, 'os_password': settings.JIRA_PASSWORD, 'os_captcha': ''}
resp = session.post(url,params=form)
cookies = requests.cookies.RequestsCookieJar()
session.cookies.update(cookies)
return session.cookies.get_dict()
def create_issue(pkey,summary,description,cname,assignee='-1'):
'''
创建问题
:return:
'''
# 先登录,获取cookie
redisService = RedisService()
req_cookies = redisService.get_jira_cookie()
if not req_cookies:
req_cookies = login()
# 将cookie写入到redis中
redisService.set_jira_cookie(req_cookies)
print(redisService.get_jira_cookie())
#
projectService = ProjectService()
componentService = ComponentService()
projectversionService = ProjectversionService()
# 发送请求,创建问题
form = {'isCreateIssue':'true','fieldsToRetain':'project','assignee':assignee,'priority':3,'issuetype':1
,'duedate':'','environment':'','timetracking_originalestimate':'','timetracking_remainingestimate':'','hasWorkStarted':''
}
# 获取项目相关数据
project = projectService.select(pkey)
form['pid']=int(project['ID'])
form['components'] = componentService.get_component_id(project['ID'],cname)
form['summary']=summary
form['versions']=projectversionService.get_version_id(project['ID'])
form['description']=description
form['atl_token']=req_cookies['atlassian.xsrf.token']
url = settings.JIRA_URL + settings.JIRA_CREATE_ISSUE
resp = requests.post(url, cookies=req_cookies, params=form)
print(resp)
if __name__ == '__main__':
pkey = 'AIT'
cname = '测试模块'
#
summary = 'AI测试91010'
description = 'AI测试9010'
create_issue(pkey, summary, description, cname,assignee='dengjun')
在redis中cookie保存25分钟
from redis.sentinel import Sentinel
from src.jira_plugin.jira import settings
class RedisService(object):
def __init__(self):
params = {'db': settings.REDIS_PARAMS['db']}
sentinel = Sentinel(settings.REDIS_SENTINELS, **params)
self.server = sentinel.master_for(settings.REDIS_PARAMS['service_name'], **params)
def set_jira_cookie(self,cookie):
self.server.set(settings.REDIS_COOKIE_KEY, cookie, settings.REDIS_COOKIE_TIMEOUT)
def get_jira_cookie(self):
cookie = self.server.get(settings.REDIS_COOKIE_KEY)
if cookie:
return eval(cookie)
return None
接着执行命令python setup.py sdist bdist_egg
进行打包,安装包上传到pypi-server后,按照下面的命令安装或卸载。
pip install --extra-index-url http://10.101.3.175:9090/ jira-plugin --trusted-host 10.101.3.175
pip uninstall jira-plugin