上一部份介绍了TCP模块大略的类构成。在查看相关的TcpRead/TcpWrite/TcpAcceptor的过程中发现,都包含以个EventLoop类。这是事件处理主循环类。在介绍其他组成部分前,我们先分析一下此类。代码在/src/communicate/tcp/ecent_loop.h/cpp
文件中。顺带手介绍一下里面的事件类Event。
先看一下Event类(含有纯虚函数)肯定要做为父类进行使用,在event_base.h
中。其子类两个:Notify(封装管道,主要用来进行事件通知),MessageEvent(封装socket,主要的处理逻辑)
class Event
{
public:
//传入eventloop指针,将Event与eventloop关联,时间、事件、数据的处理都是在eventloop中触发进行的
Event(EventLoop * poEventLoop);
virtual ~Event();
//在EventLoop中通过fd关联Event。然后执行相应的逻辑(read\write\error\timeout)
virtual int GetSocketFd() const = 0;
virtual const std::string & GetSocketHost() = 0;
virtual int OnRead();
virtual int OnWrite();
virtual void OnError(bool & bNeedDelete) = 0;
virtual void OnTimeout(const uint32_t iTimerID, const int iType);
public:
//向EventLoop中添加、删除事件,iEvents是EPOLLIN等逻辑结果
void AddEvent(const int iEvents);
void RemoveEvent(const int iEvents);
void JumpoutEpollWait();
const bool IsDestroy() const;
void Destroy();
public:
//向EventLoop中添加、删除定时事件
void AddTimer(const int iTimeoutMs, const int iType, uint32_t & iTimerID);
void RemoveTimer(const uint32_t iTimerID);
protected:
int m_iEvents;
EventLoop * m_poEventLoop;
bool m_bIsDestroy;
};
下面是Event及其子类Notify和MessageEvent的手画类图,凑活着看吧
EventLoop代码部分:
在第一眼看其头文件根据其名称分析出其某些功能:
1. 事件的添加修改删除
2. 主事件循环处理
3. 定时事件添加删除
class EventLoop
{
public:
EventLoop(NetWork * poNetWork);
virtual ~EventLoop();
int Init(const int iEpollLength);
//事件维护(增删改,改中封装了增)
void ModEvent(const Event * poEvent, const int iEvents);
void RemoveEvent(const Event * poEvent);
//开始主事件循环
void StartLoop();
void Stop();
void OnError(const int iEvents, Event * poEvent);
virtual void OneLoop(const int iTimeoutMs);
public:
void SetTcpClient(TcpClient * poTcpClient);
//跳出主时间循环,通过Notify实现,其内部通过pipe实现
void JumpoutEpollWait();
public:
//时间事件的添加删除处理。通过观察下面私有的数据结果发现,内部通过管理TimeID进行时间事件的管理
//timeID-->fd--->Event--->onTimeout(关联关系)
bool AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID);
void RemoveTimer(const uint32_t iTimerID);
void DealwithTimeout(int & iNextTimeout);
void DealwithTimeoutOne(const uint32_t iTimerID, const int iType);
public:
void AddEvent(int iFD, SocketAddress oAddr);
void CreateEvent();
void ClearEvent();
int GetActiveEventCount();
public:
//事件上下文,用来进行索引,包括event和相关监听事件
typedef struct EventCtx
{
Event * m_poEvent;
int m_iEvents; //监听内容 EPOLLIN,EPOLLOUT
} EventCtx_t;
private:
bool m_bIsEnd;
protected:
int m_iEpollFd;
epoll_event m_EpollEvents[MAX_EVENTS];
std::map<int, EventCtx_t> m_mapEvent; //fd与event对应关系map
NetWork * m_poNetWork;
TcpClient * m_poTcpClient;
Notify * m_poNotify; //内部事件通知使用
protected:
Timer m_oTimer;
std::map<uint32_t, int> m_mapTimerID2FD; //timeID 与 fd对应关系map
//添加事件的queue,每一个包含其fd及网络socket信息(IP,Port等),之后创建MessageEvent进行相关事件的处理
//此处又是一个多生产者单消费者模型。
std::queue<std::pair<int, SocketAddress> > m_oFDQueue;
std::mutex m_oMutex;
std::vector<MessageEvent *> m_vecCreatedEvent; //MessageEvent 继承自Event,添加到此EventLoop
};
event_loop.cpp
#include "event_loop.h"
#include "event_base.h"
#include "tcp_acceptor.h"
#include "tcp_client.h"
#include "comm_include.h"
#include "message_event.h"
#include "phxpaxos/network.h"
using namespace std;
namespace phxpaxos
{
EventLoop :: EventLoop(NetWork * poNetWork)
{
m_iEpollFd = -1;
m_bIsEnd = false;
m_poNetWork = poNetWork;
m_poTcpClient = nullptr;
m_poNotify = nullptr;
memset(m_EpollEvents, 0, sizeof(m_EpollEvents));
}
EventLoop :: ~EventLoop()
{
ClearEvent();
}
// 通过管道发送消息,跳出epoll_wait()
void EventLoop :: JumpoutEpollWait()
{
m_poNotify->SendNotify();
}
void EventLoop :: SetTcpClient(TcpClient * poTcpClient)
{
m_poTcpClient = poTcpClient;
}
int EventLoop :: Init(const int iEpollLength)
{
//epoll创建
m_iEpollFd = epoll_create(iEpollLength);
if (m_iEpollFd == -1)
{
PLErr("epoll_create fail, ret %d", m_iEpollFd);
return -1;
}
m_poNotify = new Notify(this);
assert(m_poNotify != nullptr);
//将管道通知类Notify添加到EventLoop(EPOLLIN事件)
int ret = m_poNotify->Init();
if (ret != 0)
{
return ret;
}
return 0;
}
void EventLoop :: ModEvent(const Event * poEvent, const int iEvents)
{
//从所有的fd、event对应关系中寻找,确定添加还是修改
//之后epoll_ctl改变感兴趣内容
//最后更新map中fd对应的ctx
auto it = m_mapEvent.find(poEvent->GetSocketFd());
int iEpollOpertion = 0;
if (it == end(m_mapEvent))
{
iEpollOpertion = EPOLL_CTL_ADD;
}
else
{
iEpollOpertion = it->second.m_iEvents ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
}
epoll_event tEpollEvent;
tEpollEvent.events = iEvents;
tEpollEvent.data.fd = poEvent->GetSocketFd();
int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
if (ret == -1)
{
PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d EpollEvent %d",
m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), iEvents);
//to do
return;
}
EventCtx tCtx;
tCtx.m_poEvent = (Event *)poEvent;
tCtx.m_iEvents = iEvents;
m_mapEvent[poEvent->GetSocketFd()] = tCtx;
}
void EventLoop :: RemoveEvent(const Event * poEvent)
{
//与添加类似只不过将添加改为删除,其他内容一样
auto it = m_mapEvent.find(poEvent->GetSocketFd());
if (it == end(m_mapEvent))
{
return;
}
int iEpollOpertion = EPOLL_CTL_DEL;
epoll_event tEpollEvent;
tEpollEvent.events = 0;
tEpollEvent.data.fd = poEvent->GetSocketFd();
int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
if (ret == -1)
{
PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d",
m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd());
//to do
//when error
return;
}
m_mapEvent.erase(poEvent->GetSocketFd());
}
void EventLoop :: StartLoop()
{
m_bIsEnd = false;
while(true)
{
BP->GetNetworkBP()->TcpEpollLoop();
int iNextTimeout = 1000;
//循环处理超时事件,并通过参数返回未超时的最小时间间隔,用于epoll_wait的超时
//Timer内部通过vector进行存储,通过算法排序取最小的东西
DealwithTimeout(iNextTimeout);
//PLHead("nexttimeout %d", iNextTimeout);
//epoll_wait的封装,根据触发的fd查找相应的ctx,在根据事件类型执行相应的操作
OneLoop(iNextTimeout);
//将感兴趣的内容(Event)添加到EventLoop中,通过MessageEvent进行添加
CreateEvent();
if (m_poTcpClient != nullptr)
{
//为client中的Event添加EPOLLOUT
m_poTcpClient->DealWithWrite();
}
if (m_bIsEnd)
{
PLHead("TCP.EventLoop [END]");
break;
}
}
}
void EventLoop :: Stop()
{
m_bIsEnd = true;
}
void EventLoop :: OneLoop(const int iTimeoutMs)
{
int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1);
if (n == -1)
{
if (errno != EINTR)
{
PLErr("epoll_wait fail, errno %d", errno);
return;
}
}
for (int i = 0; i < n; i++)
{
//查找ctx
int iFd = m_EpollEvents[i].data.fd;
auto it = m_mapEvent.find(iFd);
if (it == end(m_mapEvent))
{
continue;
}
int iEvents = m_EpollEvents[i].events;
Event * poEvent = it->second.m_poEvent;
//找到所属event然后执行相应的event的相应事件处理函数
int ret = 0;
if (iEvents & EPOLLERR)
{
OnError(iEvents, poEvent);
continue;
}
try
{
if (iEvents & EPOLLIN)
{
ret = poEvent->OnRead();
}
if (iEvents & EPOLLOUT)
{
ret = poEvent->OnWrite();
}
}
catch (...)
{
ret = -1;
}
if (ret != 0)
{
OnError(iEvents, poEvent);
}
}
}
void EventLoop :: OnError(const int iEvents, Event * poEvent)
{
BP->GetNetworkBP()->TcpOnError();
PLErr("event error, events %d socketfd %d socket ip %s errno %d",
iEvents, poEvent->GetSocketFd(), poEvent->GetSocketHost().c_str(), errno);
//从epoll中删除相关的监听事件,并从map中删除相关内容
RemoveEvent(poEvent);
bool bNeedDelete = false;
//调用用户的error处理函数并决定此event是否可以释放资源。若此处不释放好像会发生内存泄漏,因为在析构函数中,没有进行释放。
//不过我看MessageEvent的OnError函数处理中对bNeedDelete 置true处理。
//Notify中没有做相关的处理,因为Notify相关的处理逻辑不再此处
poEvent->OnError(bNeedDelete);
if (bNeedDelete)
{
poEvent->Destroy();
}
}
bool EventLoop :: AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID)
{
if (poEvent->GetSocketFd() == 0)
{
return false;
}
//此event没有添加到event Map中,添加
if (m_mapEvent.find(poEvent->GetSocketFd()) == end(m_mapEvent))
{
EventCtx tCtx;
tCtx.m_poEvent = (Event *)poEvent;
tCtx.m_iEvents = 0;
m_mapEvent[poEvent->GetSocketFd()] = tCtx;
}
//创建定时器
uint64_t llAbsTime = Time::GetSteadyClockMS() + iTimeout;
m_oTimer.AddTimerWithType(llAbsTime, iType, iTimerID);
//创建时间事件和fd的关联关系
m_mapTimerID2FD[iTimerID] = poEvent->GetSocketFd();
return true;
}
void EventLoop :: RemoveTimer(const uint32_t iTimerID)
{
//移除此timeID和fd对应关系。因为Timer没有移除接口,所以通过切断中间的关联关系,使其触发时不执行相应的处理即可
auto it = m_mapTimerID2FD.find(iTimerID);
if (it != end(m_mapTimerID2FD))
{
m_mapTimerID2FD.erase(it);
}
}
void EventLoop :: DealwithTimeoutOne(const uint32_t iTimerID, const int iType)
{
//通过timeID查找fd,再通过fd找event,找到event执行相应的OnTimeout函数即可
auto it = m_mapTimerID2FD.find(iTimerID);
if (it == end(m_mapTimerID2FD))
{
//PLErr("Timeout aready remove!, timerid %u iType %d", iTimerID, iType);
return;
}
int iSocketFd = it->second;
m_mapTimerID2FD.erase(it);
auto eventIt = m_mapEvent.find(iSocketFd);
if (eventIt == end(m_mapEvent))
{
return;
}
eventIt->second.m_poEvent->OnTimeout(iTimerID, iType);
}
void EventLoop :: DealwithTimeout(int & iNextTimeout)
{
bool bHasTimeout = true;
while(bHasTimeout)
{
//只要有超时,就会一直处理超时事件
uint32_t iTimerID = 0;
int iType = 0;
bHasTimeout = m_oTimer.PopTimeout(iTimerID, iType);
if (bHasTimeout)
{
DealwithTimeoutOne(iTimerID, iType);
iNextTimeout = m_oTimer.GetNextTimeout();
if (iNextTimeout != 0)
{
break;
}
}
}
}
void EventLoop :: AddEvent(int iFD, SocketAddress oAddr)
{
//生产者添加数据
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
m_oFDQueue.push(make_pair(iFD, oAddr));
}
void EventLoop :: CreateEvent()
{
//消费者,消费数据
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
if (m_oFDQueue.empty())
{
return;
}
//每次都清楚需要释放的MessageEvent.决定权在每一次错误发生时用户的处理中是否需要释放onError函数中bNeedDelete 。若需要释放则在此处进行释放。不需要释放则会一直保留
ClearEvent();
int iCreatePerTime = 200;
while ((!m_oFDQueue.empty()) && iCreatePerTime--)
{
auto oData = m_oFDQueue.front();
m_oFDQueue.pop();
//创建event并添加到EventLoop中
//create event for this fd
MessageEvent * poMessageEvent = new MessageEvent(MessageEventType_RECV, oData.first,
oData.second, this, m_poNetWork);
poMessageEvent->AddEvent(EPOLLIN);
m_vecCreatedEvent.push_back(poMessageEvent);
}
}
void EventLoop :: ClearEvent()
{
for (auto it = m_vecCreatedEvent.begin(); it != end(m_vecCreatedEvent);)
{
//根据MessageEvent中的bool变量值,决定是否释放其资源
if ((*it)->IsDestroy())
{
delete (*it);
it = m_vecCreatedEvent.erase(it);
}
else
{
it++;
}
}
}
int EventLoop :: GetActiveEventCount()
{
//返回总共的MessageEvent的数量
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
ClearEvent();
return (int)m_vecCreatedEvent.size();
}
}
手画结构凑活着看吧
EventLoop相关的代码逻辑就想当的清晰了。下一篇文章将详细介绍TCP的工作过程。