UDT源码剖析(六)之EPoll

常用的事件处理体系EPoll在UDT中有一种独特的用法,初见时觉得鸡肋,越到后来越是觉得妙趣横生。

EPoll

  • 基础数据结构:
    ```
    struct CEPollDesc
    {
    int m_iID; // //在map中的索引ID
    std::set

int m_iLocalID; //真实的由epoll_create返回的EID
std::set

std::set

class CEPoll //这个CEpoll使用map同时管理多个epoll实例,每个epoll实例管理1024个描述符
{
private:
int m_iIDSeed; //缓存从map中索引epoll实例的ID,每次+1,到0x7fffffff后回绕,不过肯定没有这么多的epoll实例存在
udt_pthread_mutex_t m_SeedLock;

std::map

- 初始化:`CEPoll::CEPoll()`

CEPoll::CEPoll():
m_iIDSeed(0) //将种子初始化为0
{
CGuard::createMutex(m_EPollLock);
}

- 创建真实的EPoll:`int CEPoll::create()`

int CEPoll::create()
{
CGuard pg(m_EPollLock);

int localid = 0;

#ifdef LINUX
localid = epoll_create(1024); //每个创建一个只管理1024个描述符的EPoll
if (localid < 0)
throw CUDTException(-1, 0, errno);
#else
// on BSD, use kqueue
// on Solaris, use /dev/poll
// on Windows, select
#endif

if (++ m_iIDSeed >= 0x7FFFFFFF) //防止种子进行回绕
m_iIDSeed = 0;

CEPollDesc desc; //创建一个Desc初始化后加入EPoll管理的map中
desc.m_iID = m_iIDSeed;
desc.m_iLocalID = localid;
m_mPolls[desc.m_iID] = desc;

return desc.m_iID;
}

- 向EPoll中添加UDT SOCKET:`int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)`

int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
{
CGuard pg(m_EPollLock);

map

// BARCHART: Manage all event types.
if (!events || (events & UDT_EPOLL_IN)) //向Desc的观察描述符set中添加关注的事件
p->second.m_sUDTSocksIn.insert(u);
if (!events || (
events & UDT_EPOLL_OUT))
p->second.m_sUDTSocksOut.insert(u);
if (!events || (*events & UDT_EPOLL_ERR))
p->second.m_sUDTSocksEx.insert(u);

return 0;
}

- 更新EPoll中的UDT SOCKET关注的事件:`int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events)`

int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events)
{
CGuard pg(m_EPollLock);

map

if(events){ //根据提供的事件,在Desc的观察set中添加或者删除事件
if (events & UDT_EPOLL_IN){
p->second.m_sUDTSocksIn.insert(u);
}else{
p->second.m_sUDTSocksIn.erase(u);
}
if (
events & UDT_EPOLL_OUT){
p->second.m_sUDTSocksOut.insert(u);
} else{
p->second.m_sUDTSocksOut.erase(u);
}
}

return 0;
}

- 获取UDT SOCKET关注的事件:`int CEPoll::verify_usock(const int eid, const UDTSOCKET& u, int* events)`

int CEPoll::verify_usock(const int eid, const UDTSOCKET& u, int* events)
{

CGuard pg(m_EPollLock);

map

if(events){
if(p->second.m_sUDTSocksIn.find(u) != p->second.m_sUDTSocksIn.end()){ //将关注的事件填充到event中
events |= UDT_EPOLL_IN;
}
if(p->second.m_sUDTSocksOut.find(u) != p->second.m_sUDTSocksOut.end()){
events |= UDT_EPOLL_OUT;
}
}

return 0;

}

- 向EPoll中添加SYS SOCKET,使用epoll系列函数:`int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)`

int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
{
CGuard pg(m_EPollLock);

map

ifdef LINUX

epoll_event ev; //获取事件

if (NULL == events)
ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; //如果没有提供关注的事件,则SYS默认关注所有的事件
else
{
ev.events = 0; //否则根据提供的事件列表决定关注哪些事件
if (events & UDT_EPOLL_IN)
ev.events |= EPOLLIN;
if (
events & UDT_EPOLL_OUT)
ev.events |= EPOLLOUT;
if (*events & UDT_EPOLL_ERR)
ev.events |= EPOLLERR;
}

ev.data.fd = s;
if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_ADD, s, &ev) < 0)
throw CUDTException();

endif

p->second.m_sLocals.insert(s); //向描述的SYS SOCKET中添加事件

return 0;
}

- 删除EPoll中关注的UDT SOCKET:`int CEPoll::remove_usock(const int eid, const UDTSOCKET& u)`

int CEPoll::remove_usock(const int eid, const UDTSOCKET& u)
{
CGuard pg(m_EPollLock);

map

p->second.m_sUDTSocksIn.erase(u); //从关注读写的set中删除UDT SOCKET
p->second.m_sUDTSocksOut.erase(u);

p->second.m_sUDTReads.erase(u); //从读写已发送的set中删除UDT SOCKET
p->second.m_sUDTWrites.erase(u);

return 0;
}

- 删除EPoll关注的SYS SOCKET,使用epoll系列函数:`int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)`

int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
{
CGuard pg(m_EPollLock);

map

ifdef LINUX

epoll_event ev; //删除关注的SYS SOCKET
if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_DEL, s, &ev) < 0)
throw CUDTException();

endif

p->second.m_sLocals.erase(s);

return 0;
}

- 

int CEPoll::wait(const int eid, set

//因为是填充,先清除提供的参数
if (readfds) readfds->clear();
if (writefds) writefds->clear();
if (lrfds) lrfds->clear();
if (lwfds) lwfds->clear();

int total = 0;

int64_t entertime = CTimer::getTime(); //先获取一手时间,看你怎么说
while (true)
{
CGuard::enterCS(m_EPollLock);

  map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //找到相应的epoll实例,没找到就抛异常,在抛出异常前先释放锁
  if (p == m_mPolls.end())
  {
     CGuard::leaveCS(m_EPollLock);
     throw CUDTException(5, 13);
  }

  //如果找到的epoll观察的各种参数都是空的,抛出异常
  if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
  {
     CGuard::leaveCS(m_EPollLock);
     throw CUDTException(5, 3);
  }

  // 将已发生的读事件和异常事件(UDTSOCKET)填充到用户提供的容器中,读事件是直接获得描述符set,异常事件是加入,然后更新total.写事件做同样的处理
  if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
  {
     *readfds = p->second.m_sUDTReads;    //不进行复制,直接将用户提供的指针指向UDT的可读set,然后将异常UDT添加到可读set中
     for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
        readfds->insert(*i);
     total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();    //调整返回的事件发生的数量
  }
  if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))
  {
     *writefds = p->second.m_sUDTWrites;    //直接调整写事件的指向,并将异常事件添加到写事件中,并更新事件数量
     for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
        writefds->insert(*i);
     total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
  }

  // 因为异常事件的描述符已经添加,清除当前epoll实例中的异常事件
  if(total > 0 && !p->second.m_sUDTExcepts.empty()){
    p->second.m_sUDTExcepts.clear();
  }

  //处理系统事件  
  if (lrfds || lwfds)
  {
     #ifdef LINUX
     const int max_events = p->second.m_sLocals.size();       //获取EPoll Desc中系统描述符数量,并对系统描述符做epoll_wait处理
     epoll_event ev[max_events];
     int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);    //调用wait函数处理这些事件
    
     //填充系统描述符的返回列表,累加total
     for (int i = 0; i < nfds; ++ i)
     {
        if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
       {
           lrfds->insert(ev[i].data.fd);
           ++ total;
        }
        if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
        {
           lwfds->insert(ev[i].data.fd);
           ++ total;
        }
     }
     #else
     //currently "select" is used for all non-Linux platforms.
     //faster approaches can be applied for specific systems in the future.

     //"select" has a limitation on the number of sockets

     fd_set readfds2;
     fd_set writefds2;
     FD_ZERO(&readfds2);
     FD_ZERO(&writefds2);

     for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
     {
        if (lrfds)
           FD_SET(*i, &readfds2);
        if (lwfds)
           FD_SET(*i, &writefds2);
     }

     timeval tv;
     tv.tv_sec = 0;
     tv.tv_usec = 0;
     if (::select(0, &readfds2, &writefds2, NULL, &tv) > 0)
     {
        for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
        {
           if (lrfds && FD_ISSET(*i, &readfds2))
           {
              lrfds->insert(*i);
              ++ total;
           }
           if (lwfds && FD_ISSET(*i, &writefds2))
           {
              lwfds->insert(*i);
              ++ total;
           }
        }
     }
     #endif
  }

  CGuard::leaveCS(m_EPollLock);

  if (total > 0)    //如果已经获得事件,直接返回就可以了
     return total;

  if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))    //没有事件发生并且已经超时,抛出异常
     throw CUDTException(6, 3, 0);

  CTimer::waitForEvent();   //如果无事发生就等待一会

}

return 0;
}

- 释放某一个EPoll:`int CEPoll::release(const int eid)`

int CEPoll::release(const int eid)
{
CGuard pg(m_EPollLock);

map

#ifdef LINUX
// release local/system epoll descriptor
::close(i->second.m_iLocalID); //直接关闭eid
#endif

m_mPolls.erase(i); //从map中移除Epoll Desc

return 0;
}

- 更新事件:`int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)`

int CEPoll::update_events(const UDTSOCKET& uid, std::set

map

vector

//如果没有找到合适的EPoll,移除这些丢失的EPoll
for (vector

return 0;
}

- 独立于EPoll的函数:`void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)`

void update_epoll_sets(const UDTSOCKET& uid, const set

猜你喜欢

转载自www.cnblogs.com/ukernel/p/9191055.html