object_detection
import cv2
import sys
import numpy as np
class object_detector:
def __init__(self, model, cfg, classes, inputSize=416):
self.model = model
self.cfg = cfg
self.classes = classes
self.inputSize = inputSize
self.framework = None
self.load_model()
def load_model(self):
if self.model.endswith('weights') and self.cfg.endswith('cfg') and self.classes.endswith('names'):
self.net = cv2.dnn.readNetFromDarknet(self.cfg, self.model)
self.LABELS = open(self.classes).read().strip().split("\n")
self.framework = 'Darknet'
else:
sys.exit('Wrong input for model weights and cfg')
#这里使用CUDA加速,如果没有GPU支持会自动转到CPU推理
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
def predict(self, srcImg):
boxes = []
confidences = []
classIDs = []
json_result = {}
dict_list = []
(H, W) = srcImg.shape[:2]
# Create a 4D blob from a srcImg.
if self.framework == 'Darknet':
blob = cv2.dnn.blobFromImage(srcImg, 1/255.0, (self.inputSize, self.inputSize), swapRB=True, crop=False)
else:
print("Not a Darknet models")
ln = self.net.getLayerNames()
ln = [ln[i[0] - 1] for i in self.net.getUnconnectedOutLayers()]
# Run a model
self.net.setInput(blob)
layerOutputs = self.net.forward(ln)
# 在每层输出上循环
for output in layerOutputs:
# 对每个检测进行循环
for detection in output:
scores = detection[5:]
classID = np.argmax(scores)
confidence = scores[classID]
# 过滤掉那些置信度较小的检测结果
if confidence > 0.02:
# 框后接框的宽度和高度
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
# 边框的左上角
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
# 更新检测出来的框
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
# 极大值抑制
idxs = cv2.dnn.NMSBoxes(boxes, confidences, score_threshold=.4, nms_threshold=.3)
if len(idxs) > 0:
for i in idxs.flatten():
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
data_location = {}
data_result = {}
data_det = {}
data_location["score"] = float(confidences[i])
data_location["left"] = int(x)
data_location["top"] = int(y)
data_location["width"] = int(w)
data_location["height"] = int(h)
data_result["score"] = float(confidences[i])
data_result["label"] = str(self.LABELS[classIDs[i]])
data_det["location"] = data_location
data_det["result"] = data_result
dict_list.append(data_det)
json_result["code"] = 200
json_result["data"] = dict_list
# print(json_result)
return json_result
main函数
# -*- coding: utf-8 -*-
# @Time : 2021/1/18 17:04
# @Author : Johnson
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
import cv2
import random
import os
import base64
import json
import time
import numpy as np
from enum import Enum
# 加载前面的检测类
from object_detection import object_detector
app = FastAPI()
def base64toCv(base64_src):
img_b64decode = base64.b64decode(base64_src) # base64解码
img_array = np.fromstring(img_b64decode, np.uint8) # 转换np序列
img_cv = cv2.imdecode(img_array, cv2.COLOR_BGR2RGB) # 转换OpenCV格式
return img_cv
# 将识别的类别加入枚举
class Targets(str, Enum):
det_target1 = "face"
# 定义接收数据的结构
class Item(BaseModel):
base64: str = None # 图片base64
target: Targets = None # 识别类型
@app.post('/detector')
async def calculate(request_data: Item):
Target = request_data.target
img_base64 = request_data.base64
# 输出检测的信息和调用时间
print("Detection for", Target.value, "! Time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if Target == "face":
src_img = base64toCv(img_base64)
result = face_detector.predict(src_img)
return result
else:
print("Target parameter error!")
return {"code": 201, "data": []}
if __name__ == '__main__':
# 实例化一个检测器,输入参数分别是weights、cfg、names文件的路径,和模型的输入尺寸。
face_detector = object_detector("./models/face_model/face.weights",
"./models/face_model/face.cfg",
"./models/face_model/face.names",
416)
print('Loaded face model!')
print("Service start!")
uvicorn.run(app=app,
host="0.0.0.0", # 服务器填写0.0.0.0
port=12455,
workers=1)
client
# -*- coding: utf-8 -*-
# @Time : 2021/1/18 17:14
# @Author : Johnson
import requests
import json
import base64
def test(img_path):
with open(img_path, 'rb') as f:
base64_data = base64.b64encode(f.read())
img = base64_data.decode()
datas = json.dumps({'base64': img, 'target': "face"})
rec = requests.post("http://0.0.0.0:12455/detector", data=datas)
return rec.text
result = test('1.jpg')
print(result)