一、概述
仓库位置:https://gitee.com/liudegui/cpp_master_worker.git
如果想用python写master端,可以参考我的另一篇:cpp编写的tcp服务端,提供cpp和Python两个语言版本的tcp客户端,C++部分基于boost.asio网络库
名称 | 描述 |
---|---|
master进程(master_app) | worker进程的守护进程,监控和管理引擎,提供worker进程策略下发、收集worker进程数据上报 |
worker进程(worker_app) | 取流解码、分析推理进程 |
轨迹视频叠加服务(draw-track-server) | 基于openresty开发的,在视频上画轨迹的服务 |
-
因为项目特殊要求,需要设计一个多进程master-worker工作机制。
-
master进程: 是worker进程的守护进程,监控和管理引擎,提供worker进程策略下发、收集worker进程数据上报。里面包含网络通信模块,进程管理模块,心跳检测模块,数据封装和解封装模块;
-
woker进程:包含两个工作线程,分别负责视频数据的解码和推理;处理后的结果通过自定义的序列化封装成字符流,发送至master进程;关于推理线程,我是基于opencv官方自带的人脸识别修改的,主要是为了方便演示这个框架的功能;
-
轨迹视频叠加服务(draw-track-server):由于此方案是用在嵌入式设备中,如果在嵌入式设备中涉及太多后处理操作,这将占用过多的 嵌入式设备资源。因此,考虑将部分后处理业务放到x86服务器中处理,因此才有轨迹叠加服务;
-
考虑到以后可以多个嵌入式设备并联工作,使用了zmq来做网络通信。单机中使用zmq的ipc模式;多机并联工作时,使用zmq的tcp模式;
-
master进程、worker进程和轨迹叠加服务间数据传输使用自定义的序列化模块,详细见使用C++11 基于二进制的序列化与反序列化,支持C++基础类类型及stl容器类型。
-
仓库代码中master_app、worker_app、draw-track-server均可以单独测试,详细见仓库代码中以mock开头的测试代码。
二、master进程与worker进程间管理和交互
根据项目实际需要,分析引擎使用master-worker机制,一个master进程管理多个worker进程
1、master管理worker进程
如上图
- master进程负责worker进程的启停;
- master进程传递给worker进程的参数只有一个唯一的序号(0-99);
2、master心跳检测worker进程
如上图:
- master和worker进程基于共享内存做心跳;
- master进程根据worker进程id不同,分别创建共享内存;
- worker进程定时递增共享内存中标量;
- master进程定时检测共享内存标量是否递增,如果连续三次检测均没有增加,则杀死原worker进程,并创建新的进程。
3、master与worker进程之间信息交互
如上图:
- master和worker进程基于zmq通信,使用订阅-发布模型,每个通道(ipc)单向通信;
- master进程绑定一个server.ipc,并订阅所有worker进程的ipc;
- worker进程订阅master进程ipc,并根据自己的id过滤消息;同时,绑定{id}.ipc,以供master进程订阅;
另外,master和worker进程之间信息的序列化不是采用将结构体序列化成json串,或者memcpy结构体为字符串流,而是使用了C++14的index_sequence,具体请查看"serialize.hpp"。
4、检测上报流程
如上图:
- master进程分别下发camera信息给不同的worker进程,worker进程收到消息后,
- 1)解析camera信息,获取码流信息;
- 2)启动解码和推理线程;开始取流解码并推理;
- worker进程不断检测,当检测到事件时,将事件轨迹和视频发送给master;master进程简单处理后,将事件和轨迹发送给轨迹叠加服务;
- 叠加服务器收到视频和轨迹后,
- 1)调用画图模块,叠加视频和轨迹,并生成新的视频;
- 2)发送叠加后的视频和相关信息给其它http服务器。
5、master进程启动worker进程流程
1、读取程序cfg配置文件;获取worker进程名称 ,待启动的worker进程数,是否启动共享内存定时检查等配置;
2、根据配置信息启动worker进程,master和worker进程按协定沟通wokerid(如ID00,ID01);
3、如启用共享内存定时检查,则提前创建共享内存,并创建定时器定时检查标志位是否定时被worker进程更新,若超过3次未更新标志位,则杀死进程重新创建新的进程;
4、master进程读取camera信息文件,按照读取到的信息顺序下发结构体给worker进程;若camera信息数和worker数不一致,则下发两者的最少量。
三、程序结构
1、master_app进程
程序结构
master模块间关系
补充说明
-
ini和json配置文件读取均使用boost.property_tree,具体请查看
appconfig_reader
和json_message_processor
(已做字段判空处理); -
进程管理基于boost.process封装,对部分不能跨平台的部分做了宏处理;
-
http_upload基于boost.asio封装,支持普通http post和multipart/form-data的post,可同时上传多个文件和字符串;
-
心跳检查基于boost.interprocess的共享内存封装,需要worker配对使用,可以在配置文件中关闭启动;
-
zmq的接受(recv)是完全阻塞式的,为此创建了单独的线程;引用libzmq库的同时,还引用了zmq.hpp封装,具体请查看相关头文件;
-
messagehandler和其它模块为了松耦合,使用“消息总线”关联,具体请查看
messagebus.hpp
,需要使用到boost::any; -
因为master和worker进程最终都放在一个文件夹部署,为防止.so组件混淆错乱,所有模块均编译成静态.a库;
-
loghelper为仓库的单独日志库,是boost.log的wapper;可输出到控制台、文件、syslog,支持C++的"<<"和C类型“printf”,具体请查看仓库代码。
2、worker_app进程
程序结构
worker进程模块关系
补充说明
(部分内容与master是重复的)
-
decode_video:使用到ffmpeg,支持cpu解码、amd64平台的cuda解码 和 NX平台的Nvmpi解码;
-
zmq的接受(recv)是完全阻塞式的,为此创建了单独的线程;引用libzmq库的同时,还引用了zmq.hpp封装,具体请查看相关头文件;
-
message_handler和其它模块为了松耦合,使用“消息总线”关联,具体请查看
messagebus.hpp
,需要使用到boost::any; -
因为master和worker进程最终都放在一个文件夹部署,为防止.so组件混淆错乱,所有模块均编译成静态.a库;
-
解码和推理线程间添加消息队列缓存,使用到开源的
concurrentqueue
,该组件内部支持线程同步; -
该worker进程可单独调试,编译的时候可添加
WORKER_DEBUG
宏; -
loghelper为仓库的单独日志库,是boost.log的wapper;可输出到控制台、文件、syslog,支持C++的"<<"和C类型“printf”,具体请查看仓库代码。
四、轨迹视频叠加服务器
1、程序结构
如上图:
1、video_bbox_drawer:为C++编写的轨迹和视频叠加模块,提供标准C接口函数;
2、draw-track-server:为基于openresty框架编写的http轨迹叠加服务,使用lua语言编写业务,调用video_bbox_drawer的c接口;
3、http_test_client: 为测试轨迹和视频发送的http客户端程序,使用http_upload与engine-master应相同。
2、工作流程
1、nginx收到http消息,转发给do_upload.lua处理;
2、do_upload.lua解析消息为轨迹文本和视频文件;
3、加载libvideo_bbox_drawer.so,调用c接口函数,叠加轨迹和视频文件。
五、关键结构体设计
- 上报检测轨迹:该结构体为worker和轨迹叠加服务所引用,两者需保持一致;
struct Rect_t
{
uint32_t x = 0; //
uint32_t y = 0; //
uint32_t w = 0; // 宽
uint32_t h = 0; // 高
};
struct Track_t
{
uint32_t f = 0; // 帧序号
Rect_t rect; //
};
// 上报轨迹信息
struct TrajectoryInfo_t
{
std::string video_name;
uint32_t video_width;
uint32_t video_height;
uint32_t video_framerate;
std::vector<Track_t> tracks;
};
- zmq消息通信结构体:如下为master和worker进程间交互所使用结构体,两者需保持一致。
// 摄像机下发信息
struct CameraInfo_t
{
char name[64] = {0}; // 摄像机名称
char main_stream[128] = {0}; // 主流rtsp url
char alg_param_file[128] = {0}; // 算法库配置文件
char camera_id[64] = {0}; // id
uint32_t main_pixel_w = 0; // width
uint32_t main_pixel_h = 0; // hight
uint32_t main_frame_rate = 0; // 帧率
};
// 需要上传给httpserver
struct VideoAndTracks_t
{
std::string tracksStr; // 轨迹(未base64)
std::string videoName; // 视频文件名
std::string videoData; // 视频文件内容
};
六、软件开发依赖
C++部分:C++11、libboost1.66+、libzmq、libffmpeg4.2+、libopencv4.4+
openresty:系统默认安装即可
**cpp_master_worker**依赖第三方库地址
- loghelper : 这是我一直在使用的基于boost.log的日志库;
- concurrentqueue : A fast multi-producer, multi-consumer lock-free concurrent queue for C++11
- libzmq : ZeroMQ core engine in C++, implements ZMTP/3.1 和 cppzmq : Header-only C++ binding for libzmq
这是三个开源工程代码已上传至CSDN,可以从https://download.csdn.net/download/stallion5632/86247799打包下载