1 创建数据库和表:
CREATE DATABASE IF NOT EXISTS `test`;
CREATE TABLE `person` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL,
`telephone` varchar(100) NOT NULL,
`sex` char(2) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2 下载需要的python模块
flask, flask-cors,pymysql
pip install Flask
pip install flask-cors
pip install pymysql
3 代码:
mysqlutil.py文件
import pymysql
def getConn(host, port, username, password, database):
"""获得连接"""
return pymysql.connect(host=host, port=port, user=username, password=password, database=database, charset='utf8')
def addPerson(name, telephone, sex, conn):
"""增加用户"""
result = {
}
sql = "insert into person(name, telephone, sex) values ('%s', '%s', '%s')" % (
name, telephone, sex)
try:
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
print("add successfully!")
except Exception as e:
conn.rollback()
print(e)
print("fail to add!")
result['row'] = cursor.rowcount
return result
def all(conn):
"""获得所有用户"""
cursor = conn.cursor()
sql = "select * from person"
list = []
try:
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
temp = {
}
temp['id'] = row[0]
temp['name'] = row[1]
temp['telephone'] = row[2]
temp['sex'] = row[3]
list.append(temp)
except Exception as e:
print(e)
return list
def findById(conn, id):
"""根据id获得用户"""
cursor = conn.cursor()
sql = "select * from person where id='%s'" % (id)
list = []
try:
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
temp = {
}
temp['id'] = row[0]
temp['name'] = row[1]
temp['telephone'] = row[2]
temp['sex'] = row[3]
list.append(temp)
except Exception as e:
print(e)
return list[0]
def findByName(conn, name):
"""根据name获得用户"""
cursor = conn.cursor()
sql = "select * from person where name='%s'" % (name)
list = []
try:
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
temp = {
}
temp['id'] = row[0]
temp['name'] = row[1]
temp['telephone'] = row[2]
temp['sex'] = row[3]
list.append(temp)
except Exception as e:
print(e)
return list[0]
def deleteAll(conn):
"""删除所有用户"""
result = {
}
cursor = conn.cursor()
sql = "delete from person"
try:
cursor.execute(sql)
conn.commit()
except Exception as e:
conn.rollback()
print(e)
result['row'] = cursor.rowcount
return result
def deleteById(conn, id):
"""根据Id删除用户"""
result = {
}
cursor = conn.cursor()
sql = "delete from person where id=%d " % (id)
try:
cursor.execute(sql)
conn.commit()
except Exception as e:
conn.rollback()
print(e)
result['row'] = cursor.rowcount
return result
def updateById(id, name, telephone, sex, conn):
"""根据id更新用户"""
result = {
}
sql = "update person set name='%s', telephone='%s', sex='%s' where id=%d" % (
name, telephone, sex, id)
try:
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
except Exception as e:
conn.rollback()
print(e)
print("fail to add!")
result['row'] = cursor.rowcount
return result
testmysql.py 文件:
import mysqlutil
def getConn():
"""根据主机ip,端口号,数据库用户名,数据库密码,数据库名获取数据库连接"""
return mysqlutil.getConn(host='127.0.0.1', port=3306, username='root', password='root', database='test')
app.py文件:
import json
from flask import Flask
from flask import request
from flask_cors import CORS
import testmysql
import mysqlutil
import json
app = Flask(__name__)
# 解决跨域问题
CORS(app)
@app.route("/")
def hello_world():
return "<h1>Hello, World!</h1>"
@app.route("/person/all")
def findAllPerson():
conn = testmysql.getConn()
list = mysqlutil.all(conn)
conn.close()
return json.dumps(list)
@app.route("/person/id/<id>")
def findPersonById(id):
conn = testmysql.getConn()
person = mysqlutil.findById(conn, id)
conn.close()
return json.dumps(person)
@app.route("/person/delete/id/<id>")
def deletePersonById(id):
conn = testmysql.getConn()
result = mysqlutil.deleteById(conn, int(id))
conn.close()
return json.dumps(result)
@app.route("/person/add", methods=['POST'])
def addPerson():
print("addperson")
name = request.json['name']
telephone = request.json['telephone']
sex = request.json['sex']
conn = testmysql.getConn()
result = mysqlutil.addPerson(name, telephone, sex, conn)
conn.close()
return json.dumps(result)
@app.route("/person/update", methods=["POST"])
def updatePerson():
id = request.json['id']
name = request.json['name']
telephone = request.json['telephone']
sex = request.json['sex']
conn = testmysql.getConn()
result = mysqlutil.updateById(int(id), name, telephone, sex, conn)
conn.close()
return json.dumps(result)