四、计算节点模块
4.1 计算节点模块结构
计算节点模块主要功能为数据管理、日志管理、配置管理、处理线程管理、进程通信管理、TCP通信管理。计算节点模块类关系表如下:
类名 |
关系类 |
关系 |
Management |
handle_manage |
聚合 |
handle_manage |
Handle |
聚合 |
Management |
Log |
聚合 |
Management |
Setting |
聚合 |
Management |
tcp_Socket |
聚合 |
Management |
local_server |
聚合 |
Local_server |
local_socket |
聚合 |
Management:管理类,负责管理所有模块
名称 |
类型 |
说明 |
localServer_init |
方法 |
初始化local服务器 |
process_init |
方法 |
初始化进程类 |
localServer_start |
方法 |
开启local服务器 |
process_start |
方法 |
开启进程 |
Handle_Setcount |
方法 |
设置计算线程数量 |
Handle_Start |
方法 |
开启计算线程 |
StartLog |
方法 |
启用日志 |
Log_init |
方法 |
初始化日志 |
Configur_init |
方法 |
配置初始化 |
StartConfigur |
方法 |
开启配置 |
Handle_manage:处理管理类,负责管理处理线程
名称 |
类型 |
说明 |
Cl_SocketSendData |
方法 |
转发来自处理线程的信息至LocalSocket |
Ct_SocketSendData |
方法 |
转发来自处理线程的信息至TcpSosket |
Handle:处理类,负责处理任务
名称 |
类型 |
说明 |
handle_localSocket |
方法 |
处理来自LocalSocket的任务 |
handle_tcpSocket |
方法 |
处理来自TcpSocket的任务 |
Log:日志类,负责保存日志
名称 |
类型 |
说明 |
Init |
方法 |
日志类初始化 |
SaveLog |
方法 |
保存日志 |
Setting:配置类,负责保存读取程序配置
名称 |
类型 |
说明 |
Init |
方法 |
初始化配置类 |
Start |
方法 |
开始读取配置 |
tcp_socket |
tcpSocket类 |
负责传输数据 |
send_message |
方法 |
发送数据 |
Receive |
方法 |
接收数据 |
process_data |
方法 |
数据反序列化 |
connect_server |
方法 |
连接服务器 |
local_server:本地监听类,负责进程之间通信
名称 |
类型 |
说明 |
start_server |
方法 |
始监听端口 |
SaveLog |
方法 |
保存日志 |
local_socket |
localSocket类 |
负责数据传输 |
process_data |
方法 |
数据反序列化 |
Receive |
方法 |
接收数据 |
以下是计算节点模块类图
4.2基本原理
计算节点大致结构与服务器相似,主要为接收服务器任务、转发任务与计算结果、监控计算模块是否正常等。
4.2.1 接收DLL
计算节点在启动时会向服务器发送请求DLL的信息,服务器遍历文件夹中DLL信息并发送给计算节点。DLL信息包括DLL名称与MD5值,计算节点会遍历DLL信息并且对比MD5值,不同则替换DLL,以下是服务器传输DLL列表代码:
QStringList dllinformation=data.split("@@"); QStringList LackDLLname; //如果不存在文件夹则需要服务器传输所有DLL if(!isDirExist(QCoreApplication::applicationDirPath()+"\\"+"DLL")) { for(int a=0;a<dllinformation.size();a++) { QStringList dlldata=dllinformation[a].split("$$"); if(dlldata.size()!=2) { qDebug()<<"DLL信息损坏!"; continue; } QString dllname=dlldata[0]; LackDLLname.append(dllname); } } else { for(int a=0;a<dllinformation.size();a++) { QStringList dlldata=dllinformation[a].split("$$"); if(dlldata.size()!=2) { qDebug()<<"DLL信息损坏!"; continue; } QString dllname=dlldata[0]; QString dllmd5=dlldata[1]; QString file_path=QCoreApplication::applicationDirPath()+"\\"+"DLL"+"\\"+dllname; if(!isFileExist(file_path)) { LackDLLname.append(dllname); continue; } QFile file(file_path); if(!file.open(QIODevice::ReadOnly)) { qDebug()<<"打开"<<dllname<<"失败!"; return; } QByteArray dllData= file.readAll(); file.close(); QString thismd5=GetMD5(dllData); if(dllmd5!=thismd5) { LackDLLname.append(dllname); } } } if(!LackDLLname.isEmpty()) { emit this->t_SocketSendData(this->id,"缺失DLL",LackDLLname.join("@@"),t_socket,One,""); } else { emit this->t_SocketSendData(this->id,"DLL完整","",t_socket,One,""); }
以下是服务器传输DLL至计算节点代码:
QStringList data1=data.split("&&@@"); if(data1.size()<2) { qDebug()<<"传输DLL出错!"; emit this->t_SocketSendData(this->id,"DLL传输出错","存在同名DLL!",t_socket,One,""); return; } QString dll_Name=data1[0]; QString dll_Data=data1[1]; QString file_path=QCoreApplication::applicationDirPath()+"\\"+"DLL"+"\\"+dll_Name; isDirExist(QCoreApplication::applicationDirPath()+"\\"+"DLL"); if(isFileExist(file_path)) { qDebug()<<"存在同名DLL!!"; QString dllmd5=GetMD5(QByteArray::fromBase64(dll_Data.toLatin1())); QFile file(file_path); if(!file.open(QIODevice::ReadOnly)) { return; } QByteArray dllData= file.readAll(); file.close(); QString thismd5=GetMD5(dllData); //当MD5相同则不替换 if(thismd5==dllmd5) { emit this->t_SocketSendData(id,type,data,t_socket,All,"计算节点"); } else { QFile file(file_path); if (file.open(QFile::WriteOnly|QFile::Truncate)) { QByteArray data1=QByteArray::fromBase64(dll_Data.toLatin1()); file.write(data1); file.close(); } else { qDebug()<<"写入DLL失败!!"; emit this->t_SocketSendData(id,type,"写入Dll失败",t_socket,One,""); return ; } } return ; } QFile file(file_path); if (file.open(QIODevice::Append)) { QByteArray data1=QByteArray::fromBase64(dll_Data.toLatin1()); file.write(data1); file.close(); } else { qDebug()<<"写入DLL失败!!"; emit this->t_SocketSendData(this->id,"DLL传输出错","写入Dll失败",t_socket,One,""); return ; } qDebug()<<"接受DLL成功!DLL名称:"<<dll_Name;
4.2.2 接收任务
当计算节点接收任务时会将任务通过本地socket传送至计算模块。以下是对应代码:
M_Socket *localsocket=Common::GetLocal_Socket(); if(localsocket==NULL) { qDebug()<<"计算模块未连接!"; LocalSocket_SetUnfinshsenddata("计算节点","handle",data,One,""); return; } QStringList taskdata=data.split("**&&##"); if(data.size()<3) { qDebug()<<"任务解析出错!"; return; } QString id=taskdata[0]; LocalSocket_SendData Caclate_Task={id,"handle",data,One,""}; Common::Localsocket_CaculatedataLock.lockForWrite(); Common::Localsocket_Caculatedata[id]= Caclate_Task; Common::Localsocket_CaculatedataLock.unlock(); emit this->l_SocketSendData("计算节点","handle",data,localsocket->l_socket,One,"");
4.2.3 计算模块崩溃处理
当计算模块应错误崩溃时,计算节点能实时捕捉并重启计算模块,随后将计算节点未完成任务发送至计算模块。以下是对应代码:
qDebug()<<"本地计算模块断开连接"; QList<LocalSocket_SendData> data; QMap <QString,LocalSocket_SendData>::const_iterator i; Common::Localsocket_CaculatedataLock.lockForWrite(); for (i = Common::Localsocket_Caculatedata.constBegin(); i != Common::Localsocket_Caculatedata.constEnd(); ++i) { //对QMAP进行遍历 data.append(i.value()); } Common::Localsocket_Caculatedata.clear(); Common::Localsocket_CaculatedataLock.unlock(); Common::Localsocket_unsenddataLock.lockForWrite(); Common::Localsocket_unsenddata.append(data); Common::Localsocket_unsenddataLock.unlock(); qDebug()<<QString("将未处理完的计算任务放入队列中,数量:%1").arg(data.size()); local_socket *socket=qobject_cast<local_socket*>(sender()); Common::localSocket_MapLock.lockForWrite(); if(Common::localSocket_Map.remove(socket)==0) { qDebug()<<"没有在map中找到localsocket!!"; } Common::localSocket_MapLock.unlock(); qDebug()<<"重启本地计算模块"; emit this->restart_localcaculate();
4.2.4 计算节点模块小结
计算节点主要在于监控与转发数据,需要考虑极端情况。