一、概述
代码使用 python 进行开发,使用 pycharm 作为开发工具,连接到 openGauss 数据库。使用psycopg2库进行数据库的连接,在此基础上进行数据库的一系列操作。
该系统实现医院预约的简易操作,创建医生、患者、科系、预约、预约备份、一般管理员账号密码信息 共6张表,共有4种登录类型:患者、医生、一般管理员、超级管理员。同时还创建了3个数据库存储过程函数,分别对应增删改3个触发器。通过触发器,系统自动完成对患者操作信息的记录。
(1)Data Studio上展示的ER图:
(2)模式结构
二、实现功能
系统可以实现的功能如下:
(1)患者:进行预约、取消预约、修改预约、查询预约。
(2)一般管理员:查询医院科室表、查询医生信息表、查询患者预约备份表、添加医院科室表、添加医 生信息表、清除所有预约信息。
(3)超级管理员:查询一般管理员信息表、添加一般管理员、删除一般管理员、修改一般管理员、清空 患者预约备份表。
(4)系统使用三个触发器,对患者进行预约,取消预约,修改预约的操作进行记录,把预约的相关信息 以及操作时间记录在预约备份表中。
三、连接数据库
1.使用 Python 连接数据库
# -*- coding: utf-8 -*-
from multiprocessing import connection
from psycopg2 import connect, OperationalError
import os
def CreateConnect():
try:
env = os.environ
params = {
'database': env.get('OG_DATABASE', 'postgres'), # 数据库名称
'user': env.get('OG_USER', 'tom'), # 用户姓名
'password': env.get('OG_PASSWORD', '12345'), # 用户密码
'host': env.get('OG_HOST', '192.000.00.000'), # 主机IP
'port': env.get('OG_PORT', 26000) # 端口号
}
conn: connection = connect(**params)
print('数据库连接成功!')
return conn
except OperationalError:
print('数据库连接失败!')
if __name__ == '__main__':
CreateConnect()
2.使用 Java 连接数据库
import java.sql.*;
public class GaussDBJDBC {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
//配置信息
//useUnicode=true&characterEncoding=utf-8 解决中文乱码
String url = "jdbc:postgresql://数据库ip地址:26000/school?useUnicode=true&characterEncoding=utf-8";
String username = "数据库用户名";
String password = "数据库密码";
//1.加载驱动
Class.forName("com.mysql.cj.jdbc.Driver");
//2.连接数据库,代表数据库
Connection connection = DriverManager.getConnection(url, username, password);
//3.向数据库发送SQL的对象Statement,PreparedStatement : CRUD
Statement statement = connection.createStatement();
//4.编写SQL
String sql = "select * from student";
//5.执行查询SQL,返回一个 ResultSet : 结果集
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
//student表
System.out.print("id:" + rs.getObject("std_id"));
System.out.print(" name:" + rs.getObject("std_name"));
System.out.print(" sex:" + rs.getObject("std_sex"));
System.out.print(" birth:" + rs.getObject("std_birth"));
System.out.print(" in:" + rs.getObject("std_in"));
System.out.println(" address:" + rs.getObject("std_address"));
}
//6.关闭连接,释放资源(一定要做) 先开后关
rs.close();
statement.close();
connection.close();
}
}
三、源代码
1、代码总览
2.初始化数据(InitializationCode)
仅运行一次,创建表以及导入一些初始案例数数据,如医院科室信息、医生信息等。
(1)Initialize_CreateTable.py
# -*- coding: utf-8 -*-
import os
from multiprocessing import connection
from psycopg2 import connect
'''连接数据库'''
def CreateConnect():
try:
env = os.environ
params = {
'database': env.get('OG_DATABASE', 'postgres'),
'user': env.get('OG_USER', 'tom'),
'password': env.get('OG_PASSWORD', 'Huawei_123'),
'host': env.get('OG_HOST', '192.168.56.130'),
'port': env.get('OG_PORT', 26000)
}
conn: connection = connect(**params)
return conn
except:
print('数据库连接失败!')
'''创建表'''
# 科系表
def CreateTable_Court(conn):
cursor = conn.cursor()
cursor.execute('''create table if not exists Court(
c_id int not null primary key,
c_name varchar(500)
)''')
conn.commit()
conn.close()
# 医生表
def CreateTable_Doctor(conn):
cursor = conn.cursor()
cursor.execute('''create table if not exists Doctor(
d_id int primary key,
d_name varchar(32) not null,
dp_courtID int,
d_workWeekData varchar(500),
d_workStartTime varchar(500),
d_workEndTime varchar(500),
foreign key(dp_courtID) references Court(c_id) on delete cascade
)''')
conn.commit()
conn.close()
# 患者表
def CreateTable_Patient(conn):
cursor = conn.cursor()
cursor.execute('''create table if not exists Patient(
p_id varchar(32) primary key,
p_name varchar(32) not null
)''')
conn.commit()
conn.close()
# 预约表
def CreateTable_Appointment(conn):
cursor = conn.cursor()
cursor.execute('''create table if not exists Appointment(
p_id varchar(32),
p_name varchar(500),
d_id varchar(32)
)''')
conn.commit()
conn.close()
# 管理员账号密码表
def CreateTable_Administrator(conn):
cursor = conn.cursor()
cursor.execute('''create table if not exists Administrator(
a_id varchar(32),
a_name varchar(500) primary key
)''')
conn.commit()
conn.close()
# 预约表副本
def CreateTable_AppointmentRecord(conn):
cursor = conn.cursor()
cursor.execute('''create table if not exists AppointmentRecord(
p_id varchar(32),
p_name varchar(32),
d_id varchar(32),
date_time date primary key,
type varchar(32)
)''')
conn.commit()
conn.close()
'''创建触发器函数'''
# 创建INSERT触发器函数
def CreateFunction_insertOnAppointmentRecord(conn):
cursor = conn.cursor()
cursor.execute('''CREATE OR REPLACE FUNCTION Record_insert_func ( ) RETURNS TRIGGER AS
$$
DECLARE
BEGIN
INSERT INTO AppointmentRecord(p_id,p_name,d_id,date_time,type)
VALUES (NEW.p_id,NEW.p_name,NEW.d_id,now(),'预约医生');
RETURN NEW;
END
$$
LANGUAGE PLPGSQL''')
conn.commit()
conn.close()
# 创建UPDATE触发器函数
def CreateFunction_updateOnAppointmentRecord(conn):
cursor = conn.cursor()
cursor.execute('''CREATE OR REPLACE FUNCTION Record_update_func ( ) RETURNS TRIGGER AS
$$
DECLARE
BEGIN
INSERT INTO AppointmentRecord(p_id,p_name,d_id,date_time,type)
VALUES (NEW.p_id,NEW.p_name,NEW.d_id,now(),'修改预约');
RETURN NEW;
END
$$
LANGUAGE PLPGSQL''')
conn.commit()
conn.close()
# 创建DELETE触发器函数
def CreateFunction_deleteOnAppointmentRecord(conn):
cursor = conn.cursor()
cursor.execute('''CREATE OR REPLACE FUNCTION Record_delete_func ( ) RETURNS TRIGGER AS
$$
DECLARE
BEGIN
INSERT INTO AppointmentRecord(p_id,p_name,d_id,date_time,type)
VALUES (OLD.p_id,OLD.p_name,OLD.d_id,now(),'取消预约');
RETURN NEW;
END
$$
LANGUAGE PLPGSQL''')
conn.commit()
conn.close()
'''创建触发器'''
# 创建INSERT触发器
def CreateTrigger_insertOnAppointment(conn):
cursor = conn.cursor()
cursor.execute('''DROP TRIGGER IF EXISTS insert_trigger ON Appointment;
CREATE TRIGGER insert_trigger
BEFORE INSERT ON Appointment
FOR EACH ROW
EXECUTE PROCEDURE Record_insert_func()''')
conn.commit()
conn.close()
# 创建UPDATE触发器
def CreateTrigger_updateOnAppointment(conn):
cursor = conn.cursor()
cursor.execute('''DROP TRIGGER IF EXISTS update_trigger ON Appointment;
CREATE TRIGGER update_trigger
AFTER UPDATE ON Appointment
FOR EACH ROW
EXECUTE PROCEDURE Record_update_func()''')
conn.commit()
conn.close()
# 创建DELETE触发器
def CreateTrigger_deleteOnAppointment(conn):
cursor = conn.cursor()
cursor.execute('''DROP TRIGGER IF EXISTS delete_trigger ON Appointment;
CREATE TRIGGER delete_trigger
BEFORE DELETE ON Appointment
FOR EACH ROW
EXECUTE PROCEDURE Record_delete_func()''')
conn.commit()
conn.close()
# 初始化函数
if __name__ == '__main__':
# 创建表
CreateTable_Court(CreateConnect())
CreateTable_Doctor(CreateConnect())
CreateTable_Patient(CreateConnect())
CreateTable_Appointment(CreateConnect())
CreateTable_Administrator(CreateConnect())
CreateTable_AppointmentRecord(CreateConnect())
# 创建触发器函数
CreateFunction_insertOnAppointmentRecord(CreateConnect())
CreateFunction_updateOnAppointmentRecord(CreateConnect())
CreateFunction_deleteOnAppointmentRecord(CreateConnect())
print('[提示] 表以及触发器创建成功!')
(2)Initialize_InsertData.py
# -*- coding: utf-8 -*-
import os
from multiprocessing import connection
from psycopg2 import connect
'''连接数据库'''
def CreateConnect():
try:
env = os.environ
params = {
'database': env.get('OG_DATABASE', 'postgres'),
'user': env.get('OG_USER', 'tom'),
'password': env.get('OG_PASSWORD', 'Huawei_123'),
'host': env.get('OG_HOST', '192.168.56.130'),
'port': env.get('OG_PORT', 26000)
}
conn: connection = connect(**params)
return conn
except:
print('数据库连接失败!')
'''插入数据'''
# 插入管理员信息
def InsertData_Administrator(conn):
cursor = conn.cursor()
cursor.execute('''insert into Administrator(a_id, a_name) values('adm1@1234567', 'adm1')''')
cursor.execute('''insert into Administrator(a_id, a_name) values('adm2@1234567', 'adm2')''')
cursor.execute('''insert into Administrator(a_id, a_name) values('adm3@1234567', 'adm3')''')
cursor.execute('''insert into Administrator(a_id, a_name) values('adm4@1234567', 'adm4')''')
cursor.execute('''insert into Administrator(a_id, a_name) values('adm5@1234567', 'adm5')''')
cursor.execute('''insert into Administrator(a_id, a_name) values('adm6@1234567', 'adm6')''')
conn.commit()
conn.close()
# 插入科室信息
def InsertData_Court(conn):
cursor = conn.cursor()
cursor.execute('''insert into Court(c_id, c_name) values(1, '急诊科')''')
cursor.execute('''insert into Court(c_id, c_name) values(2, '临床专科科')''')
cursor.execute('''insert into Court(c_id, c_name) values(3, '眼科')''')
cursor.execute('''insert into Court(c_id, c_name) values(4, '耳鼻喉科')''')
cursor.execute('''insert into Court(c_id, c_name) values(5, '皮肤科')''')
cursor.execute('''insert into Court(c_id, c_name) values(6, '口腔科')''')
cursor.execute('''insert into Court(c_id, c_name) values(7, '男科')''')
cursor.execute('''insert into Court(c_id, c_name) values(8, '内科')''')
cursor.execute('''insert into Court(c_id, c_name) values(9, '外科')''')
cursor.execute('''insert into Court(c_id, c_name) values(10, '妇产科')''')
cursor.execute('''insert into Court(c_id, c_name) values(11, '儿科')''')
cursor.execute('''insert into Court(c_id, c_name) values(12, '放射科')''')
cursor.execute('''insert into Court(c_id, c_name) values(13, '骨科')''')
conn.commit()
conn.close()
# 插入医生信息
def InsertData_Doctor(conn):
cursor = conn.cursor()
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 1,'张三',1,'星期五','7:30','12:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 2,'李四',2,'星期三','8:30','12:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 3,'王五',3,'星期四','11:00','19:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 4,'孙六',4,'星期五','7:30','11:00')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 5,'单七',5,'星期日','8:00','14:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 6,'王平',6,'星期一','7:30','12:20')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 7,'李胜',7,'星期二','9:00','12:00')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 8,'宋江',3,'星期五','11:20','14:15')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 9,'李逵',9,'星期三','3:30','10:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 10,'李贵',10,'星期三','7:30','15:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 11,'李鬼',11,'星期五','14:30','22:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 12,'白璇',1,'星期六','7:30','11:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 13,'李白',8,'星期日','5:30','12:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 14,'杜甫',4,'星期二','13:30','22:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 15,'杨笑',1,'星期四','13:00','20:30')''')
cursor.execute('''insert into Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime) values( 16,'卢浩',1,'星期三','14:30','23:30')''')
conn.commit()
conn.close()
if __name__ == '__main__':
InsertData_Administrator(CreateConnect())
InsertData_Court(CreateConnect())
InsertData_Doctor(CreateConnect())
print('[提示] 案例数据导入成功!')
3.主要代码(FunctionCode)
程序主要代码,通过表的增删改查等操作,实现医院预约。
(1) Connector.py
# -*- coding: utf-8 -*-
import os
from multiprocessing import connection
from psycopg2 import connect,OperationalError
'''连接数据库'''
def CreateConnect():
try:
env = os.environ
params = {
'database': env.get('OG_DATABASE', 'postgres'),
'user': env.get('OG_USER', 'tom'),
'password': env.get('OG_PASSWORD', 'Huawei_123'),
'host': env.get('OG_HOST', '192.168.56.130'),
'port': env.get('OG_PORT', 26000)
}
conn: connection = connect(**params)
return conn
except OperationalError:
print('数据库连接失败!')
(2) MainFunc.py
# -*- coding: utf-8 -*-
# 患者操作
import Patient_InsertAppointment
import Patient_CancelAppointment
import Patient_AlterAppointment
import Patient_QueryAppointment
# 一般管理员操作
import GeneralAdministrator_Login
import GeneralAdministrator_InsertTable
import GeneralAdministrator_ClearTable
import GeneralAdministrator_QueryTable
# 超级管理员操作
import SuperAdministrator_Login
import SuperAdministrator_OperateOnTable
# 主菜单
def ChooseOperatorMenu():
while True:
print('\n\n======== 操作菜单 ========')
print('\t\t1 --- 患者')
print('\t\t2 --- 管理员')
print('\t\t3 --- 结束')
print('========================')
flag = int(input('请输入菜单序号:'))
if flag == 1:
Patient_OperateMenu()
break
elif flag == 2:
Administrator_ChooseMenu()
break
elif flag == 3:
break
# 管理员菜单
def Administrator_ChooseMenu():
while True:
print('\n\n======== 操作菜单 ========')
print('\t\t1 --- 一般管理员')
print('\t\t2 --- 超级管理员')
print('\t\t3 --- 结束')
print('========================')
flag = int(input('请输入身份序号:'))
if flag == 1:
GeneralAdministrator_OperateMenu()
break
elif flag == 2:
SuperAdministrator_OperateMenu()
break
elif flag == 3:
break
# 超级管理员菜单
def SuperAdministrator_OperateMenu():
# [唯一账号] SuperAdministrator
# [唯一密码] SuperAdm_123
flag = True
while True:
a = SuperAdministrator_Login.Login()
if a == 'again':
continue
if a == 'allow':
print('[提示]登录成功!')
break
if a == 'over':
flag = False
break
while flag:
print('\n\n========= 操作菜单 =========')
print('\t1 --- 查询 一般管理员信息表')
print('\t2 --- 添加 一般管理员')
print('\t3 --- 删除 一般管理员')
print('\t4 --- 修改 一般管理员')
print('\t5 --- 清空 患者预约备份表')
print('\t6 --- 结束')
print('==========================')
flag = int(input('请输入操作序号:'))
if flag == 1:
SuperAdministrator_OperateOnTable.selectDate_GeneralAdministrator()
elif flag == 2:
SuperAdministrator_OperateOnTable.InsertData_GeneralAdministrator()
elif flag == 3:
SuperAdministrator_OperateOnTable.DeleteData_GeneralAdministrator()
elif flag == 4:
SuperAdministrator_OperateOnTable.UpdateData_GeneralAdministrator()
elif flag == 5:
SuperAdministrator_OperateOnTable.ClearData_AppointmentRecord()
elif flag == 6:
break
# 一般管理员菜单
def GeneralAdministrator_OperateMenu():
# [账号1] adm1
# [密码1] adm1@1234567
flag = True
while True:
a = GeneralAdministrator_Login.Login()
if a == 'again':
continue
if a == 'allow':
print('[提示]登录成功!')
break
if a == 'over':
flag = False
break
while flag:
print('\n\n========= 操作菜单 =========')
print('\t1 --- 查询 医院科室表')
print('\t2 --- 查询 医生信息表')
print('\t3 --- 查询 患者预约备份表')
print('\t4 --- 添加 医院科室表')
print('\t5 --- 添加 医生信息表')
print('\t6 --- 清除 所有预约信息')
print('\t7 --- 结束')
print('==========================')
flag = int(input('请输入操作序号:'))
if flag == 1:
GeneralAdministrator_QueryTable.ShowCourtMenu()
elif flag == 2:
GeneralAdministrator_QueryTable.ShowDoctorMenu()
elif flag == 3:
GeneralAdministrator_QueryTable.ShowAppointmentMenu()
elif flag == 4:
GeneralAdministrator_InsertTable.InsertCourt()
elif flag == 5:
GeneralAdministrator_InsertTable.InsertDoctor()
elif flag == 6:
GeneralAdministrator_ClearTable.ClearData()
elif flag == 7:
break
# 患者菜单
def Patient_OperateMenu():
while True:
print('\n\n========= 操作菜单 =========')
print('\t\t1 --- 开始预约')
print('\t\t2 --- 取消预约')
print('\t\t3 --- 修改预约')
print('\t\t4 --- 查询预约')
print('\t\t5 --- 结束预约')
print('==========================')
flag = int(input('请输入操作序号:'))
if flag == 1:
Patient_InsertAppointment.MainFunc()
elif flag == 2:
Patient_CancelAppointment.MainFunc()
elif flag == 3:
Patient_AlterAppointment.MainFunc()
elif flag == 4:
Patient_QueryAppointment.MainFunc()
elif flag == 5:
break
if __name__ == '__main__':
ChooseOperatorMenu()
(3) Patient_InsertAppointment.py
# -*- coding: utf-8 -*-
import GeneralAdministrator_QueryTable
import Connector
def MainFunc():
"""
开始预约
:return: 预约患者的预约编号
"""
while True:
# 展示科系表
GeneralAdministrator_QueryTable.ShowCourtMenu()
# 输入患者信息
p_name = input('请输入您的姓名:')
p_courtID = int(input('请输入您选择的科室序号:'))
while p_courtID > 11 or p_courtID < 0:
p_courtID = int(input('[提示]输入错误!请重新输入科室序号:'))
# 导入患者信息
p_id = InsertData_Patient(p_name)
# 展示医生信息
DoctorList = Show_AlternativeDoctor(p_courtID)
# 选择医生
d_id = int(input('请输入您选择的医生的序号:'))
while d_id not in DoctorList:
d_id = int(input('[提示]输入错误!请重新输入您选择的医生的序号:'))
# 导入预约表
InsertData_Appointment(p_name, d_id, p_id)
print('[提示]预约成功,您的预约单号为: ' + str(p_id))
p_id += 1
# 判断
if int(input('继续请输入1,退出请输入0:')) == 0:
break
def Show_AlternativeDoctor(p_courtID):
"""
展示可供预约的医生以及上班时间
:param p_courtID: 患者预约的科室号
:return: 展示可供预约医生的列表
"""
# 数据库查询
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute('''select d_id,c_name,d_name, d_workWeekData,d_workStartTime,d_workEndTime
from Doctor,Court
where Court.c_id = Doctor.dp_courtID
and Doctor.dp_courtID = %s'''
% (str(p_courtID)))
Doctor_rows = cursor.fetchall()
conn.commit()
conn.close()
# 展示可供选择的医生以及上班时间
DoctorList = []
print("[提示]您可选择的医生及上班时间如下:")
print('\t\t------- 医生表 -------')
for row in Doctor_rows:
print(row[0], ' 科室:', row[1], ' 姓名:', row[2], ' 时间:', row[3], row[4], '到', row[5])
DoctorList.append(row[0])
print('\t\t----------------------')
return DoctorList
def InsertData_Patient(p_name):
"""
导入患者信息
:param p_name: 患者姓名
:return:
"""
sql = '''select MAX(p_id) from Patient'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
ID = cursor.fetchone()
if ID == (None,):
p_id = 0
else:
p_id = int(''.join(ID)) + 1
sql = "insert into Patient(p_id,p_name) values(%s,%s)"
cursor.execute(sql, (str(p_id), str(p_name)))
conn.commit()
conn.close()
return p_id
def InsertData_Appointment(p_name, d_id, p_id):
"""
导入患者预约信息
:param p_name: 患者姓名
:param p_id: 患者ID
:param d_id: 医生ID
:return:
"""
sql = "insert into Appointment(p_id,p_name,d_id) values(%s,%s,%s)"
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql, (p_id, str(p_name), str(d_id)))
conn.commit()
conn.close()
(4) Patient_AlterAppointment.py
# -*- coding: utf-8 -*-
import Connector
import Patient_InsertAppointment
def MainFunc():
"""
修改预约信息
:return:
"""
while True:
p_id = str(input('请输入您的预约单号:'))
# 查询患者预约的医生
Judge = Show_ReservedDoctor(p_id)
# 替换医生
if Judge:
d_id = int(input('请选择你希望替换的医生的序号:'))
UpdateData_Appointment(d_id, p_id)
# 操作备份
pass
if int(input('继续请输入1,退出请输入0:')) == 0:
break
def Show_ReservedDoctor(p_id):
"""
展示已经预约的医生
:param p_id:
:return: 布尔型
"""
sql = '''select Doctor.d_name
from Doctor,Appointment
where Appointment.d_id = Doctor.d_id
and Appointment.p_id = %s''' % p_id
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
ExistingDoctor_rows = cursor.fetchall()
if not ExistingDoctor_rows:
print('[提示]您不在预约表中!')
conn.commit()
conn.close()
return False
else:
print('[查询结果]您已预约的医生:', ''.join(ExistingDoctor_rows[0]))
sql = '''select Doctor.dp_courtID
from Doctor,Appointment
where Appointment.d_id = Doctor.d_id
and Appointment.p_id = %s''' % p_id
cursor.execute(sql)
DoctorCourtID_rows = cursor.fetchall()
Patient_InsertAppointment.Show_AlternativeDoctor(DoctorCourtID_rows[0][0])
conn.commit()
conn.close()
return True
def UpdateData_Appointment(d_id, p_id):
"""
数据库中修改信息
:param d_id: 医生ID
:param p_id: 患者ID
:return:
"""
sql = '''update Appointment
set d_id = %s
where p_id = %s'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql, (str(d_id), str(p_id)))
print("[提示]修改成功!")
conn.commit()
conn.close()
(5) Patient_CancelAppointment.py
# -*- coding: utf-8 -*-
import Connector
def MainFunc():
while True:
p_id = str(input('请输入您的预约单号:'))
DeleteData_Appointment(p_id)
if int(input('继续请输入1,退出请输入0:')) == 0:
break
def DeleteData_Appointment(p_id):
"""
数据库中取消预约
:param p_id: 患者ID
:return:
"""
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute("select * from Appointment where p_id = '%s'" % p_id)
if not cursor.fetchall():
print('[提示]您不在预约表中!')
else:
cursor.execute("delete from Appointment where p_id = '%s'" % p_id)
cursor.execute("delete from Patient where p_id = '%s' " % p_id)
print('[提示]预约取消成功!')
conn.commit()
conn.close()
(6) Patient_QueryAppointment.py
# -*- coding: utf-8 -*-
import Connector
def MainFunc():
while True:
p_id = str(input('请输入您的预约单号:'))
sql = '''select Doctor.d_name
from Doctor,Appointment,Patient
where Appointment.d_id = Doctor.d_id
and Patient.p_id = Appointment.p_id
and Patient.p_id = %s''' % p_id
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
ExistingDoctor_rows = cursor.fetchall()
conn.commit()
conn.close()
if not ExistingDoctor_rows:
print('[提示]您不在预约表中!')
return False
else:
print('[查询结果]您已预约的医生: ', ''.join(ExistingDoctor_rows[0]))
if int(input('继续请输入1,退出请输入0:')) == 0:
break
(7) GeneralAdministrator_Login.py
# -*- coding: utf-8 -*-
import time
import Connector
def Login():
Login_Account = str(input('\t[账号] '))
Login_ID = str(input('\t[密码] '))
Login_Num = int(3)
Login_flag = FindLoginMessage(Login_ID, Login_Account)
while Login_flag is False and Login_Num > 0:
flag = int(input('[提示]输入错误\n[提示]是否继续? 继续请输入1,否则输入0: '))
if flag == 0:
return 'over'
Login_Account = str(input('\t[账号] '))
Login_ID = str(input('\t[密码] '))
Login_flag = FindLoginMessage(Login_ID, Login_Account)
Login_Num = Login_Num - 1
if Login_flag is True:
return 'allow'
if Login_Num == 0:
flag = int(input('[提示]是否继续?继续请输入1,否则输入0:'))
if flag == 0:
return 'over'
else:
Login_Account = str(input('\t[账号] '))
Login_ID = str(input('\t[密码] '))
if FindLoginMessage(Login_ID, Login_Account) is False:
print('[提示]输入错误次数过多,请5分钟后再尝试:')
time.sleep(30)
return 'again'
else:
return 'allow'
def FindLoginMessage(Login_ID, Login_Account):
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute("select * from Administrator where a_id = %s and a_name = %s", (Login_ID, Login_Account,))
if not cursor.fetchall():
flag = False
else:
flag = True
conn.commit()
conn.close()
return flag
(8) GeneralAdministrator_InsertTable.py
# -*- coding: utf-8 -*-
import Connector
# 添加 医院科室表
def InsertCourt():
while True:
c_name = str(input('请输入新的科室名称:'))
c_id = getCourtNum()
InsertData_Court(c_id, c_name)
if int(input('继续请输入1,退出请输入0:')) == 0:
break
def getCourtNum():
sql = '''select MAX(c_id) from Court'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
ID = cursor.fetchone()
if ID == (None,):
c_id = 0
else:
c_id = ID[0] + 1
conn.commit()
conn.close()
return c_id
def InsertData_Court(c_id, c_name):
sql = '''insert into Court(c_id,c_name) values(%s,%s)'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql, (str(c_id), str(c_name)))
print("\n[提示]修改成功!")
conn.commit()
conn.close()
# 添加 医生信息表
def InsertDoctor():
while True:
d_name = int(input('请输入新的医生姓名:'))
d_id = getDoctorNum()
court_list = getCourtNum()
dp_courtID = int(input('请输入新的医生的科室号:'))
while dp_courtID not in court_list:
dp_courtID = int(input('\n[提示]输入科室号错误,重新输入!'))
d_workWeekData = str(input('请输入新的医生的上班日期:'))
d_workStartTime = str(input('请输入新的医生的开始上班时间:'))
d_workEndTime = str(input('请输入新的医生的结束上班时间:'))
InsertData_Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime)
if int(input('继续请输入1,退出请输入0:')) == 0:
break
def getCourtID():
court_list = []
sql = '''select c_id from Court'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
ID = cursor.fetchall()
for row in ID:
court_list.append(row[0])
conn.commit()
conn.close()
return court_list
def getDoctorNum():
sql = '''select MAX(d_id) from Doctor'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
ID = cursor.fetchone()
if ID == (None,):
d_id = 0
else:
d_id = int(''.join(ID)) + 1
conn.commit()
conn.close()
return d_id
def InsertData_Doctor(d_id, d_name, dp_courtID, d_workWeekData, d_workStartTime, d_workEndTime):
sql = '''insert into Doctor(d_id,d_name,dp_courtID,d_workWeekData,d_workStartTime,d_workEndTime) values(%s,%s,%s,%s,%s,%s)'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql, (
str(d_id), str(d_name), str(dp_courtID), str(d_workWeekData), str(d_workStartTime), str(d_workEndTime)))
print("[提示]修改成功!")
conn.commit()
conn.close()
(9) GeneralAdministrator_QueryTable.py
# -*- coding: utf-8 -*-
import Connector
def ShowCourtMenu():
sql = '''select c_name from Court order by c_id asc'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
Court_rows = cursor.fetchall()
conn.commit()
conn.close()
print('\t\t------- 科系表 -------')
i = 1
for row in Court_rows:
print('\t\t\t' + str(i) + ' --- ' + ''.join(row))
i += 1
print('\t\t---------------------')
def ShowDoctorMenu():
sql = '''select d_id,c_name,d_name, d_workWeekData,d_workStartTime,d_workEndTime
from Doctor,Court
where Court.c_id = Doctor.dp_courtID
order by d_id asc'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
Doctor_rows = cursor.fetchall()
conn.commit()
conn.close()
print('\t\t------- 医生信息表 -------')
for row in Doctor_rows:
print(row[0], ' 科室:', row[1], ' \t姓名:', row[2], '\t时间:', row[3], row[4], '到', row[5])
print('\t\t--------------------------')
def ShowAppointmentMenu():
sql = '''select p_id,p_name,d_id, date_time,type
from AppointmentRecord
order by date_time asc'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
Doctor_rows = cursor.fetchall()
conn.commit()
conn.close()
print('\t\t------- 预约记录表 -------')
for row in Doctor_rows:
print('操作类型:', row[4], ' 操作时间:', row[3], ' 患者预约号:', row[0], ' 患者姓名:', row[1], '\t医生编号:', row[2])
print('\t\t--------------------------')
(10) GeneralAdministrator_ClearTable.py
# -*- coding: utf-8 -*-
import Connector
def ClearData():
"""
清除预约表和患者信息表
:return: 空
"""
ClearData_Appointment()
ClearData_Patient()
print('[提示]清除成功!')
def ClearData_Appointment():
"""
清除预约表
:return: 空
"""
sql = '''truncate table Appointment'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
conn.close()
def ClearData_Patient():
"""
清除患者信息表
:return: 空
"""
sql = '''truncate table Patient'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
conn.close()
(11) SuperAdministrator_Login.py
# -*- coding: utf-8 -*-
import time
def Login():
Login_Account = str(input('\t[账号] '))
Login_ID = str(input('\t[密码] '))
Login_Num = int(3)
Login_flag = FindLoginMessage(Login_ID, Login_Account)
while Login_flag == False and Login_Num > 0:
flag = int(input('[提示]输入错误\n[提示]是否继续? 继续请输入1,否则输入0: '))
if flag == 0:
return 'over'
Login_Account = str(input('\t[账号] '))
Login_ID = str(input('\t[密码] '))
Login_flag = FindLoginMessage(Login_ID, Login_Account)
Login_Num = Login_Num - 1
if Login_flag:
return 'allow'
if Login_Num == 0:
flag = int(input('[提示]是否继续?继续请输入1,否则输入0:'))
if flag == 0:
return 'over'
else:
Login_Account = str(input('\t[账号] '))
Login_ID = str(input('\t[密码] '))
if not FindLoginMessage(Login_ID, Login_Account):
print('[提示]输入错误次数过多,请5分钟后再尝试:')
time.sleep(30)
return 'again'
else:
return 'allow'
def FindLoginMessage(Login_ID, Login_Account):
if Login_Account != 'SuperAdministrator' and Login_ID != 'SuperAdm_123':
flag = False
else:
flag = True
return flag
(12) SuperAdministrator_OperateOnTable.py
# -*- coding: utf-8 -*-
import Connector
def selectDate_GeneralAdministrator():
sql = '''select a_id,a_name from Administrator'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
Administrator_rows = cursor.fetchall()
conn.commit()
conn.close()
print('\t------- 管理员信息表 -------')
for row in Administrator_rows:
print('账号:', row[1], ' 密码:', row[0])
print('\t--------------------------')
def InsertData_GeneralAdministrator():
Adm_Account = str(input('\t待添加的账号:'))
Adm_ID = str(input('\t待添加的密码:'))
conn = Connector.CreateConnect()
cursor = conn.cursor()
while True:
cursor.execute("select * from Administrator where a_name = '%s' " % Adm_Account)
if not cursor.fetchall():
break
else:
Adm_Account = str(input('[提示]已经存在该账户,请重新输入:'))
sql = '''insert into Administrator(a_id,a_name) values(%s,%s)'''
cursor.execute(sql, (str(Adm_ID), str(Adm_Account)))
print("[提示]添加成功!")
conn.commit()
conn.close()
def UpdateData_GeneralAdministrator():
conn = Connector.CreateConnect()
cursor = conn.cursor()
flag = int(input('修改账户请输入1,修改密码请输入2,否则按任意键退出:'))
if flag == 1:
Adm_OldAccount = str(input('\t旧账号:'))
Adm_NewAccount = str(input('\t新账号:'))
while True:
cursor.execute("select * from Administrator where a_name = '%s' " % Adm_OldAccount)
if not cursor.fetchall():
str(input('[提示]该账户不存在,请重新输入账号:'))
else:
break
sql = '''update Appointment set a_name = %s where a_name = %s'''
cursor.execute(sql, (str(Adm_NewAccount), str(Adm_OldAccount)))
print("[提示]修改成功!")
conn.commit()
conn.close()
elif flag == 2:
Adm_Account = str(input('\t账号:'))
Adm_NewID = str(input('\t新密码:'))
while True:
cursor.execute("select * from Administrator where a_name = '%s' " % Adm_Account)
if not cursor.fetchall():
Adm_Account = str(input('[提示]该账户不存在,请重新输入账号:'))
else:
break
sql = '''update Appointment set a_id = %s where a_name = %s'''
cursor.execute(sql, (str(Adm_NewID), str(Adm_Account)))
print("[提示]修改成功!")
conn.commit()
conn.close()
else:
return False
def DeleteData_GeneralAdministrator():
Adm_Account = str(input('\t待删除的账号:'))
str(input('\t待删除的密码:'))
conn = Connector.CreateConnect()
cursor = conn.cursor()
while True:
cursor.execute("select * from Administrator where a_name = '%s' " % Adm_Account)
if not cursor.fetchall():
Adm_Account = str(input('[提示]该账户不存在,请重新输入账号:'))
else:
break
cursor.execute("delete from Administrator where a_name = '%s'" % Adm_Account)
print('[提示]删除成功!')
conn.commit()
conn.close()
def ClearData_AppointmentRecord():
sql = '''truncate table appointmentrecord'''
conn = Connector.CreateConnect()
cursor = conn.cursor()
cursor.execute(sql)
print('[提示]清理成功!')
conn.commit()
conn.close()
四、结果展示
1、患者预约界面
预约以及查询
修改
取消
2、一般管理员控制界面
登录一般管理员:
查询医院科系:
查询医生信息:
查询患者预约操作记录表:
添加医院科系:
清理所有预约信息(包括预约表以及患者信息表):
3、超级管理员控制界面
登录超级管理员:
查询 一般管理员信息表:
添加 一般管理员:
删除 一般管理员:
清空 患者预约备份表:
结束:
五、总结反思
这次作业通过 python 实现了医院预约的基本操作,但是还存在相当多不合理的地方,比如
医生上班时间预约的人数没有设置上限;
医院预约的患者没有设置会诊的时间,导致会诊时间混乱;
患者、一般管理员、超级管理员的登录应当设置不同的数据库角色,通过 sql 语言实现超级管理员为一般管理员和患者的授权,而不是通过 python 语言实现;
该作业全部为后端编写,通过控制台实现,没有GUI图形化界面或者HTML网页;
等。
因为时间仓促,加上一系列的考试,这次作业实现的并不理想。等到以后学习了前端开发,时间允许,我会再改进作业,以达到更好的效果。