整体描述
网络部分以接口形式提供,接口定义文件为include/phxpaxos/network.h
,用户只需要根据此头文件实现相应的功能即可替代其网络部分。
其中主要接口三个,分别是发送TCP消息、发送UDP消息,接收消息。
class NetWork
{
public:
NetWork();
virtual ~NetWork() {}
//Network must not send/recieve any message before paxoslib called this funtion.
virtual void RunNetWork() = 0;
//If paxoslib call this function, network need to stop receive any message.
virtual void StopNetWork() = 0;
virtual int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
virtual int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
//When receive a message, call this funtion.
//This funtion is async, just enqueue an return.
//phxpaxos中将接收到的消息放入queue中,然后异步线程根据消息内容进行处理(包括数据CRC校验,是否超时等处理)
int OnReceiveMessage(const char * pcMessage, const int iMessageLen);
private:
friend class Node;
Node * m_poNode;
};
代码结构
与通信相关的内容全部在src/communicate
下,文件dfnetwork.h/.cpp
和 network.cpp
是其重要的逻辑,network.cpp
将收到的信息扔到paxos中进行处理。dfnetwork
对TCP、UDP相关的网络模块进行封装。communicate
部分则是结合网络节点信息对网络部分的二次封装。
udp.h/cpp
封装UDP网络部分,tcp文件夹下所有文件都是对TCP的处理。
下面先看一下network.cpp
NetWork :: NetWork() : m_poNode(nullptr)
{
}
int NetWork :: OnReceiveMessage(const char * pcMessage, const int iMessageLen)
{
if (m_poNode != nullptr)
{
//node是paxos中组成部分,将信息投递到node中即投放到paxos的逻辑处理中
m_poNode->OnReceiveMessage(pcMessage, iMessageLen);
}
else
{
PLHead("receive msglen %d", iMessageLen);
}
return 0;
}
dfnetwork部分
class DFNetWork : public NetWork
{
public:
DFNetWork();
virtual ~DFNetWork();
int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount);
void RunNetWork();
void StopNetWork();
int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage);
int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage);
private:
UDPRecv m_oUDPRecv; //UDP接收部分
UDPSend m_oUDPSend; //UDP发送部分
TcpIOThread m_oTcpIOThread; //TCP 发送接收处理部分
};
对网络的分析主要集中在下面的三个私有成员极其相关类之间。
接下来将以文字的形式描述TCP和UDP消息发送和接收的处理过程
TCP
TCP 相关处理主要有两个类,分别是Nofity
和MessageEvent
,两者都继承Event类(主要进行时间的添加、删除,定时的添加、删除,跳出时间等待)。
其中Notify
主要用来唤醒epoll_wait
进行后面的逻辑。MessageEvent
负责对数据进行读写处理。
整个的处理逻辑被封装在TcpIOThread中进行处理,TcpIOThread包含一个TcpAcceptor,多个TcpRead,多个TcpWrite。然后每一个TcpRead对应一个EventLoop,TcpWrite对应一个TcpClient和一个EventLoop.TcpClient中含有MessageEvent.大约是这么一个关系。下面是一个我手画的类关系图,凑活着看吧
数据写流程:
1. 根据发送的IP和Port找到相应的MessageEvent(若存在直接返回,不存在创建连接并返回)
2. 执行相应MessageEvent的AddMessage函数 ,添加到queue中
3. 通知epoll_wait结束处理
4. 将相应的fd打开EPOLLOUT时间发送消息(从消息queue中取未过期数据进行发送)
数据读流程:
1. 在TcpRead中的EventLoop中,会根据Fd,Addr创建MessageEvent,并在EventLoop中添加相应的EPOLLIN事件
2. EventLoop中OneLoop中有EPOLLIN事件触发,根据相应的fd找到相应的Event执行其对应的OnRead函数
3. 读完后执行NetWork中的OnReceiveMessage函数将消息投递到paxos逻辑中。
TCP相关的文字流程描述
UDP
UDP数据的发送和接收就很简单了,稍微描述一下就行了
UDP发送:
1.初始化部分 初始化socket
2. 启动。循环扫描数据queue,有数据根据数据中的IP,PORT将数据发送出去
3. 发送数据AddMessage,将IP,PORT,Message加入到queue中
UDP接收:
1. 初始化部分,设置socket,bind等
2. 启动。poll —>recvfrom—->OnReceiveMessage(NetWork中)
OK!图片中绿笔圈起来的部分就是UDP相关的内容