Phxpaxos网络部分(3) —— TCP事件循环

上一部份介绍了TCP模块大略的类构成。在查看相关的TcpRead/TcpWrite/TcpAcceptor的过程中发现,都包含以个EventLoop类。这是事件处理主循环类。在介绍其他组成部分前,我们先分析一下此类。代码在/src/communicate/tcp/ecent_loop.h/cpp 文件中。顺带手介绍一下里面的事件类Event。

先看一下Event类(含有纯虚函数)肯定要做为父类进行使用,在event_base.h 中。其子类两个:Notify(封装管道,主要用来进行事件通知),MessageEvent(封装socket,主要的处理逻辑)

class Event
{
public:
    //传入eventloop指针,将Event与eventloop关联,时间、事件、数据的处理都是在eventloop中触发进行的
    Event(EventLoop * poEventLoop); 
    virtual ~Event();

    //在EventLoop中通过fd关联Event。然后执行相应的逻辑(read\write\error\timeout)
    virtual int GetSocketFd() const = 0;

    virtual const std::string & GetSocketHost() = 0;

    virtual int OnRead();

    virtual int OnWrite();

    virtual void OnError(bool & bNeedDelete) = 0;

    virtual void OnTimeout(const uint32_t iTimerID, const int iType);

public:
    //向EventLoop中添加、删除事件,iEvents是EPOLLIN等逻辑结果  
    void AddEvent(const int iEvents);

    void RemoveEvent(const int iEvents);

    void JumpoutEpollWait();

    const bool IsDestroy() const;

    void Destroy();

public:
    //向EventLoop中添加、删除定时事件
    void AddTimer(const int iTimeoutMs, const int iType, uint32_t & iTimerID);

    void RemoveTimer(const uint32_t iTimerID);

protected:
    int m_iEvents;
    EventLoop * m_poEventLoop;

    bool m_bIsDestroy;
};

下面是Event及其子类Notify和MessageEvent的手画类图,凑活着看吧
这里写图片描述
EventLoop代码部分:
在第一眼看其头文件根据其名称分析出其某些功能:
1. 事件的添加修改删除
2. 主事件循环处理
3. 定时事件添加删除

class EventLoop
{
public:
    EventLoop(NetWork * poNetWork);
    virtual ~EventLoop();

    int Init(const int iEpollLength);
    //事件维护(增删改,改中封装了增)
    void ModEvent(const Event * poEvent, const int iEvents);

    void RemoveEvent(const Event * poEvent);
    //开始主事件循环
    void StartLoop();

    void Stop();

    void OnError(const int iEvents, Event * poEvent);

    virtual void OneLoop(const int iTimeoutMs);

public:
    void SetTcpClient(TcpClient * poTcpClient);
    //跳出主时间循环,通过Notify实现,其内部通过pipe实现
    void JumpoutEpollWait();

public:
    //时间事件的添加删除处理。通过观察下面私有的数据结果发现,内部通过管理TimeID进行时间事件的管理
    //timeID-->fd--->Event--->onTimeout(关联关系)
    bool AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID);

    void RemoveTimer(const uint32_t iTimerID);

    void DealwithTimeout(int & iNextTimeout);

    void DealwithTimeoutOne(const uint32_t iTimerID, const int iType);

public:
    void AddEvent(int iFD, SocketAddress oAddr);

    void CreateEvent();

    void ClearEvent();

    int GetActiveEventCount();

public:
    //事件上下文,用来进行索引,包括event和相关监听事件
    typedef struct EventCtx
    {
        Event * m_poEvent;
        int m_iEvents; //监听内容 EPOLLIN,EPOLLOUT
    } EventCtx_t;

private:
    bool m_bIsEnd;

protected:
    int m_iEpollFd;
    epoll_event m_EpollEvents[MAX_EVENTS];
    std::map<int, EventCtx_t> m_mapEvent;  //fd与event对应关系map
    NetWork * m_poNetWork;
    TcpClient * m_poTcpClient;
    Notify * m_poNotify; //内部事件通知使用

protected:
    Timer m_oTimer;
    std::map<uint32_t, int> m_mapTimerID2FD; //timeID 与 fd对应关系map
    //添加事件的queue,每一个包含其fd及网络socket信息(IP,Port等),之后创建MessageEvent进行相关事件的处理
    //此处又是一个多生产者单消费者模型。
    std::queue<std::pair<int, SocketAddress> > m_oFDQueue;   
    std::mutex m_oMutex;
    std::vector<MessageEvent *> m_vecCreatedEvent; //MessageEvent 继承自Event,添加到此EventLoop
};

event_loop.cpp


#include "event_loop.h"
#include "event_base.h"
#include "tcp_acceptor.h"
#include "tcp_client.h"
#include "comm_include.h"
#include "message_event.h"
#include "phxpaxos/network.h"

using namespace std;

namespace phxpaxos
{

EventLoop :: EventLoop(NetWork * poNetWork)
{
    m_iEpollFd = -1;
    m_bIsEnd = false;
    m_poNetWork = poNetWork;
    m_poTcpClient = nullptr;
    m_poNotify = nullptr;
    memset(m_EpollEvents, 0, sizeof(m_EpollEvents));
}

EventLoop :: ~EventLoop()
{
    ClearEvent();
}
// 通过管道发送消息,跳出epoll_wait()
void EventLoop :: JumpoutEpollWait()
{
    m_poNotify->SendNotify();
}
void EventLoop :: SetTcpClient(TcpClient * poTcpClient)
{
    m_poTcpClient = poTcpClient;
}

int EventLoop :: Init(const int iEpollLength)
{
    //epoll创建
    m_iEpollFd = epoll_create(iEpollLength);
    if (m_iEpollFd == -1)
    {
        PLErr("epoll_create fail, ret %d", m_iEpollFd);
        return -1;
    }

    m_poNotify = new Notify(this);
    assert(m_poNotify != nullptr);
    //将管道通知类Notify添加到EventLoop(EPOLLIN事件)
    int ret = m_poNotify->Init();
    if (ret != 0)
    {
        return ret;
    }

    return 0;
}

void EventLoop :: ModEvent(const Event * poEvent, const int iEvents)
{
    //从所有的fd、event对应关系中寻找,确定添加还是修改
    //之后epoll_ctl改变感兴趣内容
    //最后更新map中fd对应的ctx
    auto it = m_mapEvent.find(poEvent->GetSocketFd());
    int iEpollOpertion = 0;
    if (it == end(m_mapEvent))
    {
        iEpollOpertion = EPOLL_CTL_ADD;
    }
    else
    {
        iEpollOpertion = it->second.m_iEvents ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
    }

    epoll_event tEpollEvent;
    tEpollEvent.events = iEvents;
    tEpollEvent.data.fd = poEvent->GetSocketFd();

    int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
    if (ret == -1)
    {
        PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d EpollEvent %d",
                m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), iEvents);

        //to do 
        return;
    }

    EventCtx tCtx;
    tCtx.m_poEvent = (Event *)poEvent;
    tCtx.m_iEvents = iEvents;

    m_mapEvent[poEvent->GetSocketFd()] = tCtx;
}

void EventLoop :: RemoveEvent(const Event * poEvent)
{
//与添加类似只不过将添加改为删除,其他内容一样
    auto it = m_mapEvent.find(poEvent->GetSocketFd());
    if (it == end(m_mapEvent))
    {
        return;
    }

    int iEpollOpertion = EPOLL_CTL_DEL;

    epoll_event tEpollEvent;
    tEpollEvent.events = 0;
    tEpollEvent.data.fd = poEvent->GetSocketFd();

    int ret = epoll_ctl(m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd(), &tEpollEvent);
    if (ret == -1)
    {
        PLErr("epoll_ctl fail, EpollFd %d EpollOpertion %d SocketFd %d",
                m_iEpollFd, iEpollOpertion, poEvent->GetSocketFd());

        //to do 
        //when error
        return;
    }

    m_mapEvent.erase(poEvent->GetSocketFd());
}

void EventLoop :: StartLoop()
{
    m_bIsEnd = false;
    while(true)
    {
        BP->GetNetworkBP()->TcpEpollLoop();

        int iNextTimeout = 1000;
        //循环处理超时事件,并通过参数返回未超时的最小时间间隔,用于epoll_wait的超时
        //Timer内部通过vector进行存储,通过算法排序取最小的东西
        DealwithTimeout(iNextTimeout);

        //PLHead("nexttimeout %d", iNextTimeout);
    //epoll_wait的封装,根据触发的fd查找相应的ctx,在根据事件类型执行相应的操作
        OneLoop(iNextTimeout);

    //将感兴趣的内容(Event)添加到EventLoop中,通过MessageEvent进行添加
        CreateEvent();

        if (m_poTcpClient != nullptr)
        {
        //为client中的Event添加EPOLLOUT
            m_poTcpClient->DealWithWrite();
        }

        if (m_bIsEnd)
        {
            PLHead("TCP.EventLoop [END]");
            break;
        }
    }
}

void EventLoop :: Stop()
{
    m_bIsEnd = true;
}

void EventLoop :: OneLoop(const int iTimeoutMs)
{
    int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1);
    if (n == -1)
    {
        if (errno != EINTR)
        {
            PLErr("epoll_wait fail, errno %d", errno);
            return;
        }
    }

    for (int i = 0; i < n; i++)
    {
    //查找ctx
        int iFd = m_EpollEvents[i].data.fd;
        auto it = m_mapEvent.find(iFd);
        if (it == end(m_mapEvent))
        {
            continue;
        }

        int iEvents = m_EpollEvents[i].events;
        Event * poEvent = it->second.m_poEvent;
    //找到所属event然后执行相应的event的相应事件处理函数
        int ret = 0;
        if (iEvents & EPOLLERR)
        {
            OnError(iEvents, poEvent);
            continue;
        }

        try
        {
            if (iEvents & EPOLLIN)
            {
                ret = poEvent->OnRead();
            }

            if (iEvents & EPOLLOUT)
            {
                ret = poEvent->OnWrite();
            }
        }
        catch (...)
        {
            ret = -1;
        }

        if (ret != 0)
        {
            OnError(iEvents, poEvent);
        }
    }
}

void EventLoop :: OnError(const int iEvents, Event * poEvent)
{
    BP->GetNetworkBP()->TcpOnError();

    PLErr("event error, events %d socketfd %d socket ip %s errno %d", 
            iEvents, poEvent->GetSocketFd(), poEvent->GetSocketHost().c_str(), errno);
    //从epoll中删除相关的监听事件,并从map中删除相关内容
    RemoveEvent(poEvent);

    bool bNeedDelete = false;
    //调用用户的error处理函数并决定此event是否可以释放资源。若此处不释放好像会发生内存泄漏,因为在析构函数中,没有进行释放。
    //不过我看MessageEvent的OnError函数处理中对bNeedDelete 置true处理。
    //Notify中没有做相关的处理,因为Notify相关的处理逻辑不再此处
    poEvent->OnError(bNeedDelete);

    if (bNeedDelete)
    {
        poEvent->Destroy();
    }
}

bool EventLoop :: AddTimer(const Event * poEvent, const int iTimeout, const int iType, uint32_t & iTimerID)
{
    if (poEvent->GetSocketFd() == 0)
    {
        return false;
    }    
    //此event没有添加到event Map中,添加
    if (m_mapEvent.find(poEvent->GetSocketFd()) == end(m_mapEvent))
    {
        EventCtx tCtx;
        tCtx.m_poEvent = (Event *)poEvent;
        tCtx.m_iEvents = 0;

        m_mapEvent[poEvent->GetSocketFd()] = tCtx;
    }

   //创建定时器
    uint64_t llAbsTime = Time::GetSteadyClockMS() + iTimeout;
    m_oTimer.AddTimerWithType(llAbsTime, iType, iTimerID);
  //创建时间事件和fd的关联关系
    m_mapTimerID2FD[iTimerID] = poEvent->GetSocketFd();

    return true;
}

void EventLoop :: RemoveTimer(const uint32_t iTimerID)
{
//移除此timeID和fd对应关系。因为Timer没有移除接口,所以通过切断中间的关联关系,使其触发时不执行相应的处理即可
    auto it = m_mapTimerID2FD.find(iTimerID);
    if (it != end(m_mapTimerID2FD))
    {
        m_mapTimerID2FD.erase(it);
    }
}

void EventLoop :: DealwithTimeoutOne(const uint32_t iTimerID, const int iType)
{
    //通过timeID查找fd,再通过fd找event,找到event执行相应的OnTimeout函数即可
    auto it = m_mapTimerID2FD.find(iTimerID);
    if (it == end(m_mapTimerID2FD))
    {
        //PLErr("Timeout aready remove!, timerid %u iType %d", iTimerID, iType);
        return;
    }

    int iSocketFd = it->second;

    m_mapTimerID2FD.erase(it);

    auto eventIt = m_mapEvent.find(iSocketFd);
    if (eventIt == end(m_mapEvent))
    {
        return;
    }

    eventIt->second.m_poEvent->OnTimeout(iTimerID, iType);
}

void EventLoop :: DealwithTimeout(int & iNextTimeout)
{
    bool bHasTimeout = true;

    while(bHasTimeout)
    {
       //只要有超时,就会一直处理超时事件
        uint32_t iTimerID = 0;
        int iType = 0;
        bHasTimeout = m_oTimer.PopTimeout(iTimerID, iType);

        if (bHasTimeout)
        {
            DealwithTimeoutOne(iTimerID, iType);

            iNextTimeout = m_oTimer.GetNextTimeout();
            if (iNextTimeout != 0)
            {
                break;
            }
        }
    }
}

void EventLoop :: AddEvent(int iFD, SocketAddress oAddr)
{
    //生产者添加数据
    std::lock_guard<std::mutex> oLockGuard(m_oMutex);
    m_oFDQueue.push(make_pair(iFD, oAddr));
}

void EventLoop :: CreateEvent()
{
//消费者,消费数据
    std::lock_guard<std::mutex> oLockGuard(m_oMutex);

    if (m_oFDQueue.empty())
    {
        return;
    }
    //每次都清楚需要释放的MessageEvent.决定权在每一次错误发生时用户的处理中是否需要释放onError函数中bNeedDelete 。若需要释放则在此处进行释放。不需要释放则会一直保留
    ClearEvent();

    int iCreatePerTime = 200;
    while ((!m_oFDQueue.empty()) && iCreatePerTime--)
    {
        auto oData = m_oFDQueue.front();
        m_oFDQueue.pop();

    //创建event并添加到EventLoop中
        //create event for this fd
        MessageEvent * poMessageEvent = new MessageEvent(MessageEventType_RECV, oData.first, 
                oData.second, this, m_poNetWork);
        poMessageEvent->AddEvent(EPOLLIN);

        m_vecCreatedEvent.push_back(poMessageEvent);
    }
}

void EventLoop :: ClearEvent()
{
    for (auto it = m_vecCreatedEvent.begin(); it != end(m_vecCreatedEvent);)
    {
    //根据MessageEvent中的bool变量值,决定是否释放其资源
        if ((*it)->IsDestroy())
        {
            delete (*it);
            it = m_vecCreatedEvent.erase(it);
        }
        else
        {
            it++;
        }
    }
}

int EventLoop :: GetActiveEventCount()
{
//返回总共的MessageEvent的数量
    std::lock_guard<std::mutex> oLockGuard(m_oMutex);
    ClearEvent();
    return (int)m_vecCreatedEvent.size();
}

}

手画结构凑活着看吧

这里写图片描述

EventLoop相关的代码逻辑就想当的清晰了。下一篇文章将详细介绍TCP的工作过程。

猜你喜欢

转载自blog.csdn.net/g1036583997/article/details/80479644