封装了操作数据库的方法
import traceback
import pymysql
from flask import jsonify
from pymysql import connect
from collect import LOGGER
from collect.exceptions.HadesException import *
class dbConnect():
# 初始化
def __init__(self):
try:
# 连接数据库
self.conn = connect(
host='12.32.68.224',
port=3306,
database='数据库名',
user='root',
password='数据库密码',
charset='utf8',
# pymysql默认select获取的数据是元祖类型,如果想要字典类型的数据,要设置如下
cursorclass=pymysql.cursors.DictCursor
)
except:
print("连接数据库失败")
exit(-1)
# 光标
self.cs = self.conn.cursor()
# 查询数据库
def query(self, sql):
self.cs.execute(sql)
print(sql)
# 获取数据
result = self.cs.fetchall()
self.conn.close()
print(result)
return result
self.close()
# 插入数据
def add(self, sql):
try:
print(sql)
self.cs.execute(sql)
self.conn.commit()
except Exception as e:
self.conn.rollback()
finally:
self.close()
# 断开连接
def close(self):
self.cs.close()
self.conn.close()
def insert_db(self, sql):
select_sql = 'SELECT LAST_INSERT_ID()'
try:
self.cs.execute(sql)
self.conn.commit()
self.cs.execute(select_sql)
result = self.cs.fetchall()
insert_id = result[0]['LAST_INSERT_ID()']
print()
print(insert_id)
return insert_id
except pymysql.err.ProgrammingError:
LOGGER.error(traceback.format_exc())
raise DatabaseSyntaxError
except Exception:
raise Exception
return None
finally:
self.cs.close()
# if __name__ == "__main__":
# db = dbConnect()
# db.query("select * from result")
实现查/写接口
from flask import render_template, request,jsonify
from collect.controller import mysql_connection
from collect import app
from collect.controller.blueprint import create_blueprint
app_task = create_blueprint('collect', '/task', __name__)
#任务
@app_task.route('', methods=["POST", "GET"])
def task():
if request.method == 'POST':
taskname = request.form["taskname"]
remarks = request.form["remarks"]
db = mysql_connection.dbConnect()
submmit = db.query("select taskname from perf_task where taskname = '%s'" % taskname)
try:
if not submmit:
db = mysql_connection.dbConnect()
sql = "insert into perf_task(taskname, remarks) values('%s','%s') " % (taskname, remarks)
db.add(sql)
result = {'code': 200, 'Msg': 'create task successfully'}
return jsonify(result)
else:
result = {'code': 200, 'Msg': '该任务已存在,请重新输入'}
return jsonify(result)
except Exception as e:
result = {'code': -1, 'Msg': 'create task faliured'}
return jsonify(result)
elif request.method == 'GET':
db = mysql_connection.dbConnect()
sql = "select * from perf_task order by createtime DESC"
result = db.query(sql)
return(jsonify(result))
locals()
templates
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>任务页</h1>
</form>
<form action="/task" enctype='multipart/form-data' method=['POST','GET']>
<label>任务名称:</label><input type="taskname" name="taskname"/><br>
<label>任务描述:</label><input type="remarks" name="remarks"/><br>
<input type="submit" value="提交">
</form>
</body>
</html>
utils下的init文件
#导入要运行的文件的方法
import os
from flask import Flask
from collect.utils.LogUtils import Logger
app = Flask(__name__)
BASE_UPLOAD_PATH = os.path.join(app.root_path, 'static/uploads')
BASE_LOG_PATH = os.path.join(app.root_path, 'logs')
LOGGER = Logger(log_path=BASE_LOG_PATH)
from collect.controller.upload import app_upload
from collect.controller.report import app_report
from collect.controller.task import app_task
app.register_blueprint(app_upload)
app.register_blueprint(app_report)
app.register_blueprint(app_task)
from collect.controller import task
运行整个工程下的项目
from collect import app
if __name__ == '__main__':
app.run(debug=True)