前言(有不懂的欢迎私信)
全部功能:实时监控、录像回放、签到、专注度实时监测、姿态实时监测
实时监控:通过摄像头捕捉课堂情况并根据时间按照日期和节数保存
录像回放:给到日期和节数两个参数,回放之前保存的监测视频
签到:开始运行系统时捕捉N帧图像送入模型进行人脸识别,统计所到人数及同学姓名,存入数据库中以备查看
专注度实时监测:每隔一段时间捕捉当前帧画面送入模型检测,统计检测到的人脸数,除以签到总人数计算专注度并将数据存入数据库中以备查看
姿态实时监测:每隔一段时间捕捉当前帧画面送入模型检测,统计检测到的(睡觉、玩手机、使用电脑、抬头听讲)四种状态的人数计算比例存入数据库以备查看
一、需求文档
二、数据库设计(mysql)
三、功能代码
直接运行的条件:1、安装好依赖库(models在第六点) 2、按照上面的数据库设计把四张表写好 3、训练好的人脸识别和姿态检测的模型(可以私信我发已经训练好的模型也可以自己训练) 4、本地人脸信息库 (文件夹存放图片就可以了)5、修改代码中的图片及视频的存储路径并创建好对应的文件夹 6、下载调用模型的依赖代码(代码文件有点多不方便放这上面)
import time
import cv2
import os
import datetime
import face_recognition
import pymysql
import numpy as np
from multiprocessing import Process,Queue
import models
IMAGE = None
newest_Attendance=None
Attendance_list=None
Absence_list=None
Concentration=[]
Concentration_newest=None
Attitude_rate=[]
Attitude_rate_newest=None
time_str = datetime.datetime.now().strftime("%H:%M:%S")
H, M, S = time_str.split(":")
courseid = 0
if int(H) <= 10:
courseid = 1
elif 10 < int(H) <= 12:
courseid = 2
elif 14 <= int(H) < 16:
courseid = 3
elif 16 <= int(H) < 18:
courseid = 4
else:
courseid=5
img_name = []
img_name_name = []
# 读入数据文件
for i in os.listdir("image"):
img_name.append(i)
img_name_name.append(os.path.splitext(i)[0])
tupianliebiao = []
for i in img_name:
# Load a sample picture and learn how to recognize it.
obama_image = face_recognition.load_image_file("image\\" + str(i))
obama_face_encoding = face_recognition.face_encodings(obama_image)[0]
tupianliebiao.append(obama_face_encoding)
# Create arrays of known face encodings and their names
known_face_encodings = tupianliebiao
known_face_names = img_name_name
Total_number_of_people = len(known_face_names)
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
data = (0,"aiclass",Total_number_of_people, "image")
try:
cursor.execute("insert into classinfo values(%s,%s,%s,%s)", data)
db.commit()
db.close()
except:
db.rollback()
db.close()
def realtimemonitor(x,num,newest_Attendance_Queue):
global IMAGE,courseid
# 指定视频编解码方式为MJPG
codec = cv2.VideoWriter_fourcc(*'MJPG')
fps = 20 # 指定写入帧率为20
frameSize = (640, 480) # 指定窗口大小
# 视频保存路径
Save_path = "videotape" + '\\' + str(datetime.datetime.now().strftime("%Y%m%d"))+str(courseid) + '.avi'
# 创建 VideoWriter对象
out = cv2.VideoWriter(Save_path, codec, fps, frameSize)
# 摄像头
cap = cv2.VideoCapture("E:\Desktop\WIN_20210601_08_20_59_Pro.mp4")
fps = cap.get(cv2.CAP_PROP_FPS)
Sign_in=[]
#签到统计
while num:
num-=1
success, frame = cap.read()
Sign_in.append(frame)
time.sleep(1)
classattendence(Sign_in)
newest_Attendance_Queue.put(newest_Attendance)
# 捕获视频
success, frame = cap.read()
time1 = time.time()
while success:
cv2.namedWindow("Wmain", 0)
cv2.imshow("Wmain", frame)
time2=time.time()
if time2-time1>=10:
IMAGE = frame.copy()
x.put(IMAGE)
time1 = time2
key = cv2.waitKey(int(1000 // fps))
if key == ord("q"):
break
success, frame = cap.read()
out.write(frame)
# 释放资源
cv2.destroyAllWindows()
cap.release()
def repalyvideo(time, courseid=0):
cap=cv2.VideoCapture("videotape//"+str(str(time)+str(courseid))+str(".avi"))
success, frame = cap.read()
fps = cap.get(cv2.CAP_PROP_FPS)
# 捕获视频
success, frame = cap.read()
index = 1
while success:
cv2.namedWindow("Wmain",0)
cv2.imshow("Wmain", frame)
key = cv2.waitKey(int(1000 // fps))
if key == ord("q"):
break
success, frame = cap.read()
# 释放资源
cv2.destroyAllWindows()
cap.release()
def main(num=1):
global newest_Attendance
x = Queue() # 创建一个队列
newest_Attendance_Queue=Queue()
p = Process(target=realtimemonitor, args=(x,num,newest_Attendance_Queue)) # 创建一个进程,并放入队列中
p.start() # 启动
time.sleep(num)
newest_Attendance=newest_Attendance_Queue.get()
if not newest_Attendance:
print("没有检测到同学,请调整摄像头位置")
while newest_Attendance:
if x.qsize()>=1:
IMAGE = x.get()
focusestimate(IMAGE)
poseestimate(IMAGE)
print()
def peoplecount():
global newest_Attendance
#print(newest_Attendance)
return newest_Attendance
def focusestimate(image):
global known_face_encodings, known_face_names, newest_Attendance
global Concentration,Concentration_newest
# Grab a single frame of video
frame = image
# Resize frame of video to 1/4 size for faster face recognition processing
small_frame = cv2.resize(frame, (0, 0), fx=1, fy=1)
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
rgb_small_frame = small_frame[:, :, ::-1]
process_this_frame = True
# Only process every other frame of video to save time
if process_this_frame:
# Find all the faces and face encodings in the current frame of video
face_locations = face_recognition.face_locations(rgb_small_frame)
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
face_names = []
for face_encoding in face_encodings:
# See if the face is a match for the known face(s)
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
name = "Unknown"
# # If a match was found in known_face_encodings, just use the first one.
# if True in matches:
# first_match_index = matches.index(True)
# name = known_face_names[first_match_index]
# Or instead, use the known face with the smallest distance to the new face
face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
best_match_index = np.argmin(face_distances)
if matches[best_match_index]:
name = known_face_names[best_match_index]
face_names.append(name)
process_this_frame = not process_this_frame
# Display the results
for (top, right, bottom, left), name in zip(face_locations, face_names):
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
top *= 4
right *= 4
bottom *= 4
left *= 4
# Draw a box around the face
cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)
# Draw a label with a name below the face
cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)
c = []
for i in known_face_names:
if i not in face_names:
c.append(i)
Concentration.append(len(face_names)/newest_Attendance)
Concentration_newest=Concentration[-1]
print("专注度:")
print(Concentration_newest)
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
if len(Concentration)==1:
data = (datetime.datetime.now(), courseid, 0, Concentration)
try:
cursor.execute("insert into focusrecord values(%s,%s,%s,%s)", data)
db.commit()
db.close()
except:
db.rollback()
else:
data = (datetime.datetime.now(), courseid, 0, Concentration)
try:
cursor.execute("UPDATE focusrecord SET focusvalue='{}' WHERE coursetime = '{}' AND courseid = {}".format(
str(Concentration), str(str(data[0]).split(' ')[0]), data[1]))
db.commit()
db.close()
except:
db.rollback()
def getfocusrecord(time, courseid=0):
global Concentration_newest
if time == 0:
# print("专注度:"+str(Concentration_newest))
return ("专注度:"+str(Concentration_newest))
else:
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
time = str(time)
coursetime = str(time[0:4]) + "-" + str(time[4:6]) + "-" + str(time[6:8])
# 查询操作
cursor.execute(
'SELECT * from focusrecord WHERE courseid = {} AND coursetime = "{}"'.format(courseid, coursetime))
data = cursor.fetchall()
#print(data[-1][-1])
# 关闭数据库连接
db.close()
return data[-1][-1]
def poseestimate(image):
global newest_Attendance,Attitude_rate,Attitude_rate_newest
Posture_dictionary = {
"study": 0, "practice": 0, "sleep": 0, "usephone": 0}
jieguo=models.Attitude_detection(image)
print("姿态:")
print(jieguo)
for i in jieguo:
if i[1]>=0.1:
Posture_dictionary[i[0]]=Posture_dictionary[i[0]]+1
proportion = []
for i in Posture_dictionary:
Posture_dictionary[i] = Posture_dictionary[i] / newest_Attendance
proportion.append(Posture_dictionary[i])
Attitude_rate.append(proportion)
Attitude_rate_newest=Attitude_rate[-1]
a=[]
b=[]
c=[]
d=[]
for i in Attitude_rate:
a.append(i[0])
b.append(i[1])
c.append(i[2])
d.append(i[3])
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
if len(Attitude_rate)==1:
data = (datetime.datetime.now(), courseid, 0, a, b, c, d)
try:
cursor.execute("insert into poserecord values(%s,%s,%s,%s,%s,%s,%s)", data)
db.commit()
db.close()
except:
db.rollback()
else:
data = (datetime.datetime.now(), courseid, 0, a, b, c, d)
try:
cursor.execute(
"UPDATE poserecord SET studyvalue='{}', practiveyvalue='{}' ,sleepvalue='{}' ,usephoneyvalue='{}' WHERE coursetime = '{}' AND courseid = {}".format(
str(a),str(b),str(c),str(d),str(str(data[0]).split(' ')[0]), data[1]))
db.commit()
db.close()
except:
db.rollback()
def getposerecord(time, courseid=0):
global Concentration_newest
if time == 0:
# print("姿态统计:" + str(Attitude_rate_newest))
return ("姿态统计:" + str(Attitude_rate_newest))
else:
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
time = str(time)
coursetime = str(time[0:4]) + "-" + str(time[4:6]) + "-" + str(time[6:8])
# 查询操作
cursor.execute(
"SELECT * from poserecord WHERE courseid = {} AND coursetime = '{}'".format(courseid, coursetime))
data = cursor.fetchall()
# 关闭数据库连接
db.close()
return data[-1][-4:]
def classattendence(image):
global known_face_encodings,known_face_names,Total_number_of_people,newest_Attendance
global Attendance_list, Absence_list
Here_we_are=[]
for frame in image:
# Resize frame of video to 1/4 size for faster face recognition processing
small_frame = cv2.resize(frame, (0, 0), fx=1, fy=1)
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
rgb_small_frame = small_frame[:, :, ::-1]
process_this_frame = True
# Only process every other frame of video to save time
if process_this_frame:
# Find all the faces and face encodings in the current frame of video
face_locations = face_recognition.face_locations(rgb_small_frame)
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
face_names = []
for face_encoding in face_encodings:
# See if the face is a match for the known face(s)
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
name = "Unknown"
# # If a match was found in known_face_encodings, just use the first one.
# if True in matches:
# first_match_index = matches.index(True)
# name = known_face_names[first_match_index]
# Or instead, use the known face with the smallest distance to the new face
face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
best_match_index = np.argmin(face_distances)
if matches[best_match_index]:
name = known_face_names[best_match_index]
face_names.append(name)
process_this_frame = not process_this_frame
# Display the results
for (top, right, bottom, left), name in zip(face_locations, face_names):
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
top *= 4
right *= 4
bottom *= 4
left *= 4
# Draw a box around the face
cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)
# Draw a label with a name below the face
cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)
for i in face_names:
Here_we_are.append(i)
Here_we_are=list(set(Here_we_are))
c = []
for i in known_face_names:
if i not in Here_we_are:
c.append(i)
print("签到人数:")
print(len(Here_we_are))
print("已到名单:")
print(str(Here_we_are))
print("未到名单:")
print(str(c))
print()
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
# 写入数据库操作
data = (datetime.datetime.now(), courseid, 0, len(Here_we_are), str(Here_we_are), str(c))
#查询是否已经存在数据
coursetime = str(str(datetime.datetime.now())[0:10])
cursor.execute(
'SELECT * from attendencerecord WHERE courseid = {} AND coursetime = "{}"'.format(courseid, coursetime))
data_have = cursor.fetchall()
#如果有数据则执行修改操作
if len(data_have):
try:
cursor.execute(
"UPDATE attendencerecord SET classid='{}', peoplecount='{}' ,attendence='{}' ,absent='{}' WHERE coursetime = '{}' AND courseid = {}".format(
data[2], data[3], data[4].replace("'","''"),data[5].replace("'","''"), str(str(data[0]).split(' ')[0]), data[1]))
db.commit()
db.close()
except:
db.rollback()
db.close()
#如果没有数据则执行插入操作
else:
try:
cursor.execute("insert into attendencerecord values(%s,%s,%s,%s,%s,%s)", data)
db.commit()
db.close()
except:
db.rollback()
db.close()
newest_Attendance=len(Here_we_are)
#print("present:" + str(len(Here_we_are)) + "\n" + "abscent:" + str(
# Total_number_of_people - len(Here_we_are)))
#print("present_list:" + str(Here_we_are) + "\n" + "abscent_list:" + str(c))
Attendance_list=face_names
Absence_list=c
def getattendencerecord(time, classid=0):
global Attendance_list,Absence_list
if time==0:
#print("实到人数:"+str(newest_Attendance))
#print("出席学生姓名列表:"+str(Attendance_list))
#print("缺席学生姓名列表:"+str(Absence_list))
return ("实到人数:"+str(newest_Attendance),"出席学生姓名列表:"+str(Attendance_list),"缺席学生姓名列表:"+str(Absence_list))
else:
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='root',
password='123',
db='aieye')
# 获取操作游标
cursor = db.cursor()
time=str(time)
coursetime = str(time[0:4])+"-"+str(time[4:6])+"-"+str(time[6:8])
# 查询操作
cursor.execute('SELECT * from attendencerecord WHERE courseid = {} AND coursetime = "{}"'.format(courseid ,coursetime))
data = cursor.fetchall()
#print(data)
# 关闭数据库连接
db.close()
return data
#-------实时监控--------
#realtimemonitor()
#-------录像回放--------
#repalyvideo(20211022,1)
#-------签到统计---------
#classattendence("image\\zhangzhiqiang.jpg")
#getattendencerecord(20210618,5)
#-------专注度统计-------
#focusestimate(cv2.imread("image\\yangxinzhi.jpg"))
#getfocusrecord(20210618,5)
#--------姿态统计--------
#poseestimate(cv.imread("data\attitude\0001.jpg")
#getposerecord(20210619,5)
if __name__ == '__main__':
main(20)
总结
人懒此处省略,有问题欢迎评论区,阿巴阿巴。。。。。。。。。。。。。