UDT源码剖析(四)之Socket函数

UDTSOCKET socket(int af, int type, int protocol)

UDT SOCKET的创建顺序:UDTSOCKET UDT::socket(int af,int type,int protocol) -> UDTSOCKET CUDT::socket(int af,int tyoe,int proctol) -> UDTSOCKET CUDTUnited::newSocket(int af, int type)
  • Order 0:UDTSOCKET CUDTUnited::newSocket(int af, int type):创建UDT SOCKET
    ```
    UDTSOCKET CUDTUnited::newSocket(int af, int type)
    {
    if ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) //如果参数不正确,直接返回
    throw CUDTException(5, 3, 0);

CUDTSocket* ns = NULL;

try
{
ns = new CUDTSocket; //new一个CUDTSocket
ns->m_pUDT = new CUDT; //紧接着new一个CUDT
if (AF_INET == af) //根据IPV4 OR IPV6 ,更新本地地址,并将端口预设为0
{
ns->m_pSelfAddr = (sockaddr)(new sockaddr_in);
((sockaddr_in
)(ns->m_pSelfAddr))->sin_port = 0;
}
else
{
ns->m_pSelfAddr = (sockaddr)(new sockaddr_in6);
((sockaddr_in6
)(ns->m_pSelfAddr))->sin6_port = 0;
}
}
catch (...)
{
delete ns;
throw CUDTException(3, 2, 0);
}

CGuard::enterCS(m_IDLock);
ns->m_SocketID = -- m_SocketID; //在初始化的时候会随机一个UDT SOCKET,之后新创建的UDT SOCKET在此基础上累加或者累减就可以了
CGuard::leaveCS(m_IDLock);

ns->m_Status = INIT; //调整UDTSocket的状态为INIT
ns->m_ListenSocket = 0; //初始化Listen Socket ID为0
ns->m_pUDT->m_SocketID = ns->m_SocketID; //将刚刚获得ID注册到CUDT中
ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM; //确定CUDT的类型
ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
ns->m_pUDT->m_pCache = m_pCache; //CUDT与CUDTUnited共用一个CCache

// protect the m_Sockets structure.
CGuard::enterCS(m_ControlLock);
try
{
m_Sockets[ns->m_SocketID] = ns; //在全局的map中保存CUDTSocket*
}
catch (...)
{
//failure and rollback
CGuard::leaveCS(m_ControlLock);
delete ns;
ns = NULL;
}
CGuard::leaveCS(m_ControlLock);

if (NULL == ns)
throw CUDTException(3, 2, 0);

return ns->m_SocketID;
}

##int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)
#####bind的关联顺序:`int UDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::open() -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)`
- Order 0:`int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)`

int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen):将UDT SOCKET与某一个地址相关联
{
CUDTSocket* s = locate(u); //获取这个UDT SOCKET的CUDTSocket*
if (NULL == s)
throw CUDTException(5, 4, 0);

CGuard cg(s->m_ControlLock);

// cannot bind a socket more than once
if (INIT != s->m_Status)
throw CUDTException(5, 0, 0);

// check the size of SOCKADDR structure
if (AF_INET == s->m_iIPversion)
{
if (namelen != sizeof(sockaddr_in))
throw CUDTException(5, 3, 0);
}
else
{
if (namelen != sizeof(sockaddr_in6))
throw CUDTException(5, 3, 0);
}

s->m_pUDT->open(); //调用CUDT*中的open()并修改CUDT实例的状态
updateMux(s, name);
s->m_Status = OPENED; //调整CUDTSocket的状态为Opened

// copy address information of local node
s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);

return 0;
}

- Order 1:`void CUDT::open()`:继续填充CUDT中的选项

void CUDT::open()
{
CGuard cg(m_ConnectionLock);

//初始化有效载荷的大小
m_iPktSize = m_iMSS - 28; //packet size = MSS - 28
m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; //payload size = MSS - 28 - 16

m_iEXPCount = 1; //异常计数器设置为1
m_iBandwidth = 1; //估计带宽为1 packet / S
m_iDeliveryRate = 16; //对方的接收速率为16 packet / S
m_iAckSeqNo = 0; //上一次收到的ACK为0
m_ullLastAckTime = 0; //上一次接收ACK的事件为0

m_StartTime = CTimer::getTime(); //初始时间为当前时间
m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0; //统计信息初始化
m_LastSampleTime = CTimer::getTime(); //最后的性能采样时间为现在
m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; //统计信息为0
m_llSndDuration = m_llSndDurationTotal = 0;

if (NULL == m_pSNode) //初始化发送链表,如果不存在,就new一个
m_pSNode = new CSNode;
m_pSNode->m_pUDT = this; //确定Send List Node中的CUDT*的指向
m_pSNode->m_llTimeStamp = 1; //将堆的比较值初始化为1
m_pSNode->m_iHeapLoc = -1; //目前不存在于堆中

if (NULL == m_pRNode) //初始化接收链表,如果不存在就new一个
m_pRNode = new CRNode;
m_pRNode->m_pUDT = this; //确定Recv List Node中的CUDT*的指向
m_pRNode->m_llTimeStamp = 1; //将初始化位置信息为1
m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL; //暂时不存在于堆中
m_pRNode->m_bOnList = false;

m_iRTT = 10 * m_iSYNInterval; //RTT为 10 * 10000个CPU时钟周期
m_iRTTVar = m_iRTT >> 1; //RTT方差为RTT >> 1
m_ullCPUFrequency = CTimer::getCPUFrequency(); //获得CPU的时钟周期

// set up the timers
m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; //初始化SYN发送的时间间隔为10000个CPU时钟周期

//设置NAK超时下限与超时下限阀值为100ms,这300000个时钟周期就是100ms??
m_ullMinNakInt = 300000 * m_ullCPUFrequency;
m_ullMinExpInt = 300000 * m_ullCPUFrequency;

//ACK与NAK的发送间隔与SYN的发送间隔相同

m_ullACKInt = m_ullSYNInt;
m_ullNAKInt = m_ullMinNakInt;

uint64_t currtime; //获得当前的时间,gettimeofday()
CTimer::rdtsc(currtime);
m_ullLastRspTime = currtime; //上一次的请求连接时间为现在
m_ullNextACKTime = currtime + m_ullSYNInt; //确定下一次ACK与SYN的发送时间
m_ullNextNAKTime = currtime + m_ullNAKInt;

m_iPktCount = 0; //收到的Packet计数为0
m_iLightACKCount = 1; //收到的ACK计数为1

m_ullTargetTime = 0; //下一个数据包的预计发送时间为0
m_ullTimeDiff = 0; //两个数据包发送间隔为0,之后要根据RTT估算

// Now UDT is opened.
m_bOpened = true;
}

- Order 1:`void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)`

void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
{
CGuard cg(m_ControlLock);

if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
{
//如果还没有关联到某一个CMultiplexer上,先获得想要关联的port
int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in)addr)->sin_port) : ntohs(((sockaddr_in6)addr)->sin6_port);

  // 根据获得port在CMultiplexer中进行寻找
  for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
  {
     if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
     {
        //获得相应的CMultiplexer
        if (i->second.m_iPort == port)
        {
           // reuse the existing multiplexer
           ++ i->second.m_iRefCount;    //首先叠加引用计数
           s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;    //享用Send Queue
           s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;    //享用Recv Queue
           s->m_iMuxID = i->second.m_iID;    //顺便填充ID,为下一次查找提供便利
           return;
        }
     }
  }

}

// 如果没有找到,就意味着需要创建一个新的CMultiplexer,并将新创建的CMultiplexer与CUDT相关联
CMultiplexer m;
m.m_iMSS = s->m_pUDT->m_iMSS;
m.m_iIPversion = s->m_pUDT->m_iIPversion;
m.m_iRefCount = 1;
m.m_bReusable = s->m_pUDT->m_bReuseAddr;
m.m_iID = s->m_SocketID;

m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion); //由CMultiplexer管理CChannel
m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);

try
{
if (NULL != udpsock)
m.m_pChannel->open(*udpsock); //打开这个UDP SOCKET,如果提供Port,就与相关的Port关联,如果不提供,就随机选择Port
else
m.m_pChannel->open(addr);
}
catch (CUDTException& e)
{
m.m_pChannel->close();
delete m.m_pChannel;
throw e;
}

//全部都是初始化,将CUDT与CMultiplexer相关联,并将新创建的CMultiplexer添加到其中
sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr) new sockaddr_in : (sockaddr) new sockaddr_in6;
m.m_pChannel->getSockAddr(sa);
m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in)sa)->sin_port) : ntohs(((sockaddr_in6)sa)->sin6_port);
if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in)sa; else delete (sockaddr_in6)sa;

m.m_pTimer = new CTimer;

//Send Queue与Recv Queue共享Timer
m.m_pSndQueue = new CSndQueue;
m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
m.m_pRcvQueue = new CRcvQueue;
m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);

m_mMultiplexer[m.m_iID] = m;

s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
s->m_iMuxID = m.m_iID;
}

##int bind2(UDTSOCKET u, UDPSOCKET udpsock)
#####bind2的关联顺序:`int UDT::bind2(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock) -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)`
注意:`bind`的情况基本一样,唯一的不同时UDP端口的选择,如果没有提供UDP端口,会进行随机的选择。
- Order 0:`int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)`:将UDT Socket与UDP Socket关联起来

int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
{
CUDTSocket* s = locate(u); //获取CUDTSocket实例
if (NULL == s)
throw CUDTException(5, 4, 0);

CGuard cg(s->m_ControlLock);

// cannot bind a socket more than once
if (INIT != s->m_Status)
throw CUDTException(5, 0, 0);

sockaddr_in name4;
sockaddr_in6 name6;
sockaddr* name;
socklen_t namelen;

if (AF_INET == s->m_iIPversion)
{
namelen = sizeof(sockaddr_in);
name = (sockaddr)&name4;
}
else
{
namelen = sizeof(sockaddr_in6);
name = (sockaddr
)&name6;
}

if (-1 == ::getsockname(udpsock, name, &namelen))
throw CUDTException(5, 3);

s->m_pUDT->open();
updateMux(s, name, &udpsock);
s->m_Status = OPENED;

// copy address information of local node
s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);

return 0;
}

- 没有提供UDP参数:`void CChannel::open(const sockaddr* addr)`

void CChannel::open(const sockaddr* addr)
{
// 创建UDP Socket
m_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0);

#ifdef WINDOWS
if (INVALID_SOCKET == m_iSocket)
#else
if (m_iSocket < 0)
#endif
throw CUDTException(1, 0, NET_ERROR);

//如果有提供地址,将提供的地址与创建的UDT关联。如果没有提供地址,调用函数从本地获取地址,然后与创建的Socket关联
if (NULL != addr)
{
socklen_t namelen = m_iSockAddrSize;

  //将addr与UDP Socket关联  
  if (0 != ::bind(m_iSocket, addr, namelen))
     throw CUDTException(1, 3, NET_ERROR);

}
else
{
//sendto or WSASendTo will also automatically bind the socket
addrinfo hints;
addrinfo* res;

  memset(&hints, 0, sizeof(struct addrinfo));

  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = m_iIPversion;
  hints.ai_socktype = SOCK_DGRAM;

  if (0 != ::getaddrinfo(NULL, "0", &hints, &res))
     throw CUDTException(1, 3, NET_ERROR);

  if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))
     throw CUDTException(1, 3, NET_ERROR);

  ::freeaddrinfo(res);

}

setUDPSockOpt(); //设置UDP的接收/发送缓冲区,并将UDP设置为非阻塞
}

- 提供UDP参数:`void CChannel::open(UDPSOCKET udpsock)`

void CChannel::open(UDPSOCKET udpsock)
{
m_iSocket = udpsock;
setUDPSockOpt();
}

##int listen(UDTSOCKET u, int backlog)
#####listen的启动顺序:`void UDT::listen(UDTSOCKET u,int backlog) -> int CUDT::listen(UDTSOCKET u, int backlog) -> int CUDTUnited::listen(const UDTSOCKET u, int backlog) -> void CUDT::listen()`
- Order 0:`int CUDTUnited::listen(const UDTSOCKET u, int backlog)`:将创建的UDTSOCKET作为Listener Socket

int CUDTUnited::listen(const UDTSOCKET u, int backlog)
{
CUDTSocket* s = locate(u); //从全局的map中寻找CUDTSocket实例
if (NULL == s)
throw CUDTException(5, 4, 0);

CGuard cg(s->m_ControlLock);

// 如果当前的CUDTSocket已经处于Listening状态,直接返回
if (LISTENING == s->m_Status)
return 0;

// 如果当前的CUDTSocket还没有打开,抛出异常
if (OPENED != s->m_Status)
throw CUDTException(5, 5, 0);

// Listener不支持交会连接模式
if (s->m_pUDT->m_bRendezvous)
throw CUDTException(5, 7, 0);

//如果队列长度小于backlog,就直接返回
if (backlog <= 0)
throw CUDTException(5, 3, 0);

s->m_uiBackLog = backlog;

try
{
s->m_pQueuedSockets = new set

s->m_pUDT->listen();

s->m_Status = LISTENING; //调整状态为LISTENING

return 0;
}

- Order 1:`void CUDT::listen()`:调整CUDT实例状态

void CUDT::listen()
{
CGuard cg(m_ConnectionLock);

//如果状态不正确,就抛出异常
if (!m_bOpened)
throw CUDTException(5, 0, 0);

if (m_bConnecting || m_bConnected)
throw CUDTException(5, 2, 0);

// 如果已经处于LISTENING状态,直接返回
if (m_bListening)
return;

//将这个CUDT实例设置为Listener
if (m_pRcvQueue->setListener(this) < 0)
throw CUDTException(5, 11, 0);

m_bListening = true;
}

##UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen)
#####accept的处理顺序:`UDTSOCKET UDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)`
- Order 0:`UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)`

UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
{
if ((NULL != addr) && (NULL == addrlen))
throw CUDTException(5, 3, 0);

CUDTSocket* ls = locate(listener); //首先寻找Listener的CUDTSocket实例

if (ls == NULL)
throw CUDTException(5, 4, 0);

// 如果Listener的状态不正确,直接返回
if (LISTENING != ls->m_Status)
throw CUDTException(5, 6, 0);

// Listener不可能存在交会连接模式
if (ls->m_pUDT->m_bRendezvous)
throw CUDTException(5, 7, 0);

UDTSOCKET u = CUDT::INVALID_SOCK;
bool accepted = false;

// !!only one conection can be set up each time!!
#ifndef WINDOWS
while (!accepted)
{
pthread_mutex_lock(&(ls->m_AcceptLock));

     //再次判断连接状态,如果不正确,退出循环   
     if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
     {
        // This socket has been closed.
        accepted = true;
     }
     else if (ls->m_pQueuedSockets->size() > 0)    //如果此时有存在的连接,等到accept()进行处理。取出连接并将其加入已连接队列,退出循环
     {
        u = *(ls->m_pQueuedSockets->begin());    
        ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
        ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
        accepted = true;
     }
     else if (!ls->m_pUDT->m_bSynRecving)
     {
        accepted = true;
     }
        
     //如果状态正确,但是没有等到连接,进入等待状态,等待事件发生时被唤醒
     if (!accepted && (LISTENING == ls->m_Status))
        pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
    
     //如果等待accept()队列为空,取消关注这个Listener的可读事件   
     if (ls->m_pQueuedSockets->empty())
        m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);

     pthread_mutex_unlock(&(ls->m_AcceptLock));
  }

#else
while (!accepted)
{
WaitForSingleObject(ls->m_AcceptLock, INFINITE);

     if (ls->m_pQueuedSockets->size() > 0)
     {
        u = *(ls->m_pQueuedSockets->begin());
        ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
        ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());

        accepted = true;
     }
     else if (!ls->m_pUDT->m_bSynRecving)
        accepted = true;

     ReleaseMutex(ls->m_AcceptLock);

     if  (!accepted & (LISTENING == ls->m_Status))
        WaitForSingleObject(ls->m_AcceptCond, INFINITE);

     if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
     {
        // Send signal to other threads that are waiting to accept.
        SetEvent(ls->m_AcceptCond);
        accepted = true;
     }

     if (ls->m_pQueuedSockets->empty())
        m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
  }

#endif

//处理收到了一个无效的SOCKET的情况
if (u == CUDT::INVALID_SOCK)
{
// non-blocking receiving, no connection available
if (!ls->m_pUDT->m_bSynRecving)
throw CUDTException(6, 2, 0);

  // listening socket is closed
  throw CUDTException(5, 6, 0);

}

//否则的话,本次获得了一个有效的UDT SOCKET,然后填充用户提供的参数
if ((addr != NULL) && (addrlen != NULL))
{
if (AF_INET == locate(u)->m_iIPversion)
addrlen = sizeof(sockaddr_in);
else
addrlen = sizeof(sockaddr_in6);

  // copy address information of peer node
  memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);

}

return u;
}

##int connect(UDTSOCKET u, const struct sockaddr* name, int namelen)
#####connect的处理顺序:`int UDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::connect(const sockaddr* serv_addr)`
- Order 0:`int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)`

int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
{
CUDTSocket* s = locate(u); //获取CUDTSocket实例
if (NULL == s)
throw CUDTException(5, 4, 0);

CGuard cg(s->m_ControlLock);

//检查CUDTSocket*实例状态
if (AF_INET == s->m_iIPversion)
{
if (namelen != sizeof(sockaddr_in))
throw CUDTException(5, 3, 0);
}
else
{
if (namelen != sizeof(sockaddr_in6))
throw CUDTException(5, 3, 0);
}

//一个UDTSocket实例只有在INIT的状态下才能进行Connect操作
if (INIT == s->m_Status)
{
if (!s->m_pUDT->m_bRendezvous) //假设不执行交汇连接操作
{
s->m_pUDT->open(); //首先打开,其次将这个UDT SOCKET关联到UDP资源复用器上,具体的代码分析见上述
updateMux(s);
s->m_Status = OPENED; //在成功之后,将这个UDT SOCKET的状态设置为OPENED
}
else
throw CUDTException(5, 8, 0);
}
else if (OPENED != s->m_Status)
throw CUDTException(5, 2, 0);

//然后将UDT SOCKET实例的状态更新为:CONNECTING
s->m_Status = CONNECTING;
try
{
s->m_pUDT->connect(name); //调用CUDT中的中的实例,正式进行连接
}
catch (CUDTException e)
{
s->m_Status = OPENED;
throw e;
}

//重新记录对方的地址
delete s->m_pPeerAddr;
if (AF_INET == s->m_iIPversion)
{
s->m_pPeerAddr = (sockaddr)(new sockaddr_in);
memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
}
else
{
s->m_pPeerAddr = (sockaddr
)(new sockaddr_in6);
memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
}

return 0;
}

- Order 1:`void CUDT::connect(const sockaddr* serv_addr)`:调整CUDT实例的资源

void CUDT::connect(const sockaddr* serv_addr)
{
CGuard cg(m_ConnectionLock);

if (!m_bOpened) //如果目前的状态不正确,直接返回
throw CUDTException(5, 0, 0);

if (m_bListening) //Listener不能调用connect
throw CUDTException(5, 2, 0);

if (m_bConnecting || m_bConnected) //只调整了CUDT SOCKET的Status为CONNECTING,还没有调整CUDT实例的状态
throw CUDTException(5, 2, 0);

// 记录对方地址
delete m_pPeerAddr;
m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr)new sockaddr_in : (sockaddr)new sockaddr_in6;
memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));

//这个作用是在交会连接队列中等待HandleShake Packet,等待事件是1S。在我们的实现中可以不考虑
uint64_t ttl = 3000000;
if (m_bRendezvous)
ttl *= 10;
ttl += CTimer::getTime();
m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);

//因为是主动发起连接,填充想要发送的握手包
m_ConnReq.m_iVersion = m_iVersion;
m_ConnReq.m_iType = m_iSockType;
m_ConnReq.m_iMSS = m_iMSS;
m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
m_ConnReq.m_iID = m_SocketID;
CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);

//在发送握手包的时候随机化一个ISN
srand((unsigned int)CTimer::getTime());
m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));

//根据这个ISN,初始化CUDT实例中的信息
m_iLastDecSeq = m_iISN - 1; //最后一次发送的序列号
m_iSndLastAck = m_iISN; //上一次收到的ACK
m_iSndLastDataAck = m_iISN; //最后一次用于更新发送缓冲区的ACK
m_iSndCurrSeqNo = m_iISN - 1; //已发送的最大的ACK
m_iSndLastAck2 = m_iISN; //最后送回的ACK2
m_ullSndLastAck2Time = CTimer::getTime(); //最后送回的ACK2的时间

//打包一个握手包,ID:0是握手包的标志
CPacket request;
char* reqdata = new char [m_iPayloadSize];
request.pack(0, NULL, reqdata, m_iPayloadSize);
request.m_iID = 0;

int hs_size = m_iPayloadSize;
m_ConnReq.serialize(reqdata, hs_size); //在向已经打包的Packet中添加握手的数据
request.setLength(hs_size); //更新握手包的长度
m_pSndQueue->sendto(serv_addr, request); //调用发送队列,发送握手包
m_llLastReqTime = CTimer::getTime(); //修改最后发送数据的时间

m_bConnecting = true; //此时还没有收到回应的数据包,所以处于连接建立过程

// 如果是异步连接,直接返回,在收到回应包的时候对状态进行调整
if (!m_bSynRecving)
{
delete [] reqdata;
return;
}

// 同步连接,等待来自对方的回应,直到连接完成才能返回。不过在这块将回应包进行打包是什么意思??
CPacket response;
char* resdata = new char [m_iPayloadSize];
response.pack(0, NULL, resdata, m_iPayloadSize);

CUDTException e(0, 0);

//在这个循环中等待对方的回应
while (!m_bClosing)
{
//如果距离上一次发送请求的时间已经过去了250ms,再次发送请求
if (CTimer::getTime() - m_llLastReqTime > 250000)
{
m_ConnReq.serialize(reqdata, hs_size);
request.setLength(hs_size);
if (m_bRendezvous)
request.m_iID = m_ConnRes.m_iID;
m_pSndQueue->sendto(serv_addr, request);
m_llLastReqTime = CTimer::getTime();
}

  response.setLength(m_iPayloadSize);
  if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
  {
     if (connect(response) <= 0)    //用于三次握手的第二次,处理收到的响应SYN的数据包
        break;    //返回0是处理成功,<0是处理错误,>0是包丢失需要重新处理

     //新的请求或者回应在收到回复后立即发出 
     m_llLastReqTime = 0;
  }

  if (CTimer::getTime() > ttl)    //如果处理这个Connection花费了太多的时间,抛出异常
  {
     e = CUDTException(1, 1, 0);
     break;
  }

}

delete [] reqdata;
delete [] resdata;

//根据不同的情况处理异常
if (e.getErrorCode() == 0)
{
if (m_bClosing) // if the socket is closed before connection...
e = CUDTException(1);
else if (1002 == m_ConnRes.m_iReqType) // connection request rejected
e = CUDTException(1, 2, 0);
else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN)) // secuity check
e = CUDTException(1, 4, 0);
}

if (e.getErrorCode() != 0)
throw e;
}

- Order 2:`int CUDT::connect(const CPacket& response)`:发送三次握手的第二次Packet,相应收到的SYN。但是,没见发送ACK2 Packet啊...不过肯定会有,之后在处理Packet的模块进行详解..

int CUDT::connect(const CPacket& response)
{
// 如果处理成功,就返回0;失败返回-1;返回1 OR 2意味着连接正在进行,但是出现了丢包,需要更多的握手包

if (!m_bConnecting) //如果此时的状态没有处于Connecting状态,直接返回错误
return -1;

if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
{
// 数据包或者保活包的到来,意味着连接已经完成,在这种情况中,意味着连接已经完成
goto POST_CONNECT;
}

//如果收到的回应包的类型不正确,直接返回错误
if ((1 != response.getFlag()) || (0 != response.getType()))
return -1;

m_ConnRes.deserialize(response.m_pcData, response.getLength()); //从回应包中将回应的数据填充到本地

//判断连接模式,反正正常的连接不会在这个步骤进行额外的操作
if (m_bRendezvous)
{
// rendezvous connect require 3-way handshake
if (1 == m_ConnRes.m_iReqType)
return -1;

  if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
  {
     m_ConnReq.m_iReqType = -1;
     // the request time must be updated so that the next handshake can be sent out immediately.
     m_llLastReqTime = 0;
     return 1;
  }

}
else
{
// set cookie
if (1 == m_ConnRes.m_iReqType) //如果这是一个Keep-Alive Packet,意味着包已经丢失,需要重新发送请求包
{
m_ConnReq.m_iReqType = -1;
m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
m_llLastReqTime = 0;
return 1;
}
}

POST_CONNECT:
//从交会连接队列中移除这个UDT SOCKET
m_pRcvQueue->removeConnector(m_SocketID);

// 根据协商的值重新填充数据,因为收到了一个SYN,就意味着需要调整ISN
m_iMSS = m_ConnRes.m_iMSS;
m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
m_iPktSize = m_iMSS - 28;
m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
m_iPeerISN = m_ConnRes.m_iISN;
m_iRcvLastAck = m_ConnRes.m_iISN;
m_iRcvLastAckAck = m_ConnRes.m_iISN;
m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
m_PeerID = m_ConnRes.m_iID;
memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);

//连接将要完成,创建所有需要的数据结构
try
{
m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
m_pACKWindow = new CACKWindow(1024);
m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
m_pSndTimeWindow = new CPktTimeWindow();
}
catch (...)
{
throw CUDTException(3, 2, 0);
}

//在Cache中记录这个连接的信息
CInfoBlock ib;
ib.m_iIPversion = m_iIPversion;
CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
if (m_pCache->lookup(&ib) >= 0)
{
m_iRTT = ib.m_iRTT;
m_iBandwidth = ib.m_iBandwidth;
}

//针对这个连接提供拥塞控制算法,以后详谈
m_pCC = m_pCCFactory->create();
m_pCC->m_UDT = m_SocketID;
m_pCC->setMSS(m_iMSS);
m_pCC->setMaxCWndSize(m_iFlowWindowSize);
m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
m_pCC->setRcvRate(m_iDeliveryRate);
m_pCC->setRTT(m_iRTT);
m_pCC->setBandwidth(m_iBandwidth);
m_pCC->init();

//根据拥塞控制类型填充拥塞窗口大小
m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
m_dCongestionWindow = m_pCC->m_dCWndSize;

// 此时正式进入连接状态
m_bConnecting = false;
m_bConnected = true;

//将这个UDT SOCKET置于接收队列上面,之后就可以从连接队列上接收数据了
m_pRNode->m_bOnList = true;
m_pRcvQueue->setNewEntry(this);

//更新与这个SOCKET ID相关的事件
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);

// 调整UDT SOCKET实例的状态为:Connected
s_UDTUnited.connect_complete(m_SocketID);

return 0;
}

##int flush(UDTSOCKET u)
#####flush的处理流程:`int UDT::flush(UDTSOCKET u) -> int CUDT::flush(UDTSOCKET u) -> int CUDTUnited::flush(const UDTSOCKET u) -> void CUDT::flush()`
- Order 0:`int CUDTUnited::flush(const UDTSOCKET u)`:交由GC线程回收发送缓存区与接收缓冲区的数据,因为设置了Linger,所以需要判断是否需要一定的延迟

int CUDTUnited::flush(const UDTSOCKET u)
{
CUDTSocket* s = locate(u);
if (NULL == s)
throw CUDTException(5, 4, 0);

s->m_pUDT->flush();

return 0;
}

- Order 1:`void CUDT::flush()`

void CUDT::flush()
{
uint64_t entertime = CTimer::getTime();

while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
{
// linger has been checked by previous close() call and has expired
if (m_ullLingerExpiration >= entertime)
break;

  if (!m_bSynSending)
  {
     // if this socket enables asynchronous sending, return immediately and let GC to close it later
     if (0 == m_ullLingerExpiration)
        m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;

     return;
  }

  #ifndef WINDOWS
     timespec ts;
     ts.tv_sec = 0;
     ts.tv_nsec = 1000000;
     nanosleep(&ts, NULL);
  #else
     Sleep(1);
  #endif

}
}

## int close(UDTSOCKET u)
#####close的处理流程:` int UDT::close(UDTSOCKET u) ->  int CUDT::close(UDTSOCKET u) -> int CUDTUnited::close(const UDTSOCKET u) -> void CUDT::close()`
- Order 0:`int CUDTUnited::close(const UDTSOCKET u)`

int CUDTUnited::close(const UDTSOCKET u)
{
CUDTSocket* s = locate(u); //获取CUDTSocket实例
if (NULL == s)
throw CUDTException(5, 4, 0);

CGuard socket_cg(s->m_ControlLock);

if (s->m_Status == LISTENING) //如果是Listener
{
if (s->m_pUDT->m_bBroken) //如果目前CUDT实例的状态已经损害,直接退出,等待GC线程回收资源
return 0;

  s->m_TimeStamp = CTimer::getTime();    //更新UDT SOCKET关闭时间
  s->m_pUDT->m_bBroken = true;    //然后还是调整CUDT的状态为损坏

  // broadcast all "accept" waiting
  #ifndef WINDOWS
     pthread_mutex_lock(&(s->m_AcceptLock));
     pthread_cond_broadcast(&(s->m_AcceptCond));    //唤醒所有等待accept()的线程
     pthread_mutex_unlock(&(s->m_AcceptLock));
  #else
     SetEvent(s->m_AcceptCond);
  #endif

  return 0;    //调整Listener之后就可以直接退出了

}

s->m_pUDT->close(); //调用连接CUDT中的close()调整CUDT的状态

// synchronize with garbage collection.
CGuard manager_cg(m_ControlLock);

// since "s" is located before m_ControlLock, locate it again in case it became invalid
map

s->m_Status = CLOSED; //调整CUDTSocket的状态为CLOSED

// UDT SOCKET在关闭的时候不会立刻被移除,以防止其他的回调函数访问无效地址,定时器启动,资源会在1S之后被删除
s->m_TimeStamp = CTimer::getTime(); //调整UDTSocket关闭的事件

m_Sockets.erase(s->m_SocketID); //从全局的map中删除
m_ClosedSockets.insert(pair

CTimer::triggerEvent();

return 0;
}

- Order 1:`void CUDT::close()`:调整CUDT实例的状态

void CUDT::close()
{
if (!m_bOpened) //如果CUDT已经关闭,直接返回
return;

if (0 != m_Linger.l_onoff) //如果设置了稍后关闭,那么在稍后的时候再进行处理
{
flush();
}

//从接收数据的队列中移除数据
if (m_bConnected)
m_pSndQueue->m_pSndUList->remove(this);

// 清理与这个UDTSocket实例相关的事件
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);

// 将这个UDT SOCKET ID从所有注册的Epoll中移除
try
{
for (set

if (!m_bOpened) //再次检查状态
return;

// 目前正处于关闭过程中
m_bClosing = true;

CGuard cg(m_ConnectionLock);

// 如果发送者和接受者还在等待数据,发送信号通知他们退出,pthread_cond_signal
releaseSynch();

if (m_bListening) //如果是Listener
{
m_bListening = false; //调整Listener的状态
m_pRcvQueue->removeListener(this); //从接收队列中移除这个Listener,不再负责接收的事务处理
}
else if (m_bConnecting) //如果正处于CONNETING
{
m_pRcvQueue->removeConnector(m_SocketID); //从交汇连接队列中移除这个UDT SOCKET
}

if (m_bConnected) //如果连接已经完成
{
if (!m_bShutdown) //如果对方还没有发送ShutDown
sendCtrl(5); //name我们就向对方发送ShutDown

  m_pCC->close();    //关闭拥塞控制

  // 在Cache中存储并更新这个连接的信息
  CInfoBlock ib;
  ib.m_iIPversion = m_iIPversion;
  CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
  ib.m_iRTT = m_iRTT;
  ib.m_iBandwidth = m_iBandwidth;
  m_pCache->update(&ib);

  m_bConnected = false;    //调整状态为关闭

}

// 等待Send和Recv的停止
CGuard sendguard(m_SendLock); //如果在发送数据或者接收数据时,Buffer不够用的时候,一般会使用pthread_cond_wait睡眠一会
CGuard recvguard(m_RecvLock);

// CLOSED.
m_bOpened = false; //这个UDT SOCKET已经关闭
}

##int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false)
#####send的处理流程:`int UDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)`
- Order 0:`int CUDT::send(const char* data, int len)`:处理发送数据报的情况,仅适用于UDP

int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
{
if (UDT_STREAM == m_iSockType) //如果不是UDP类型直接返回
throw CUDTException(5, 9, 0);

// 如果当前的状态不正确,要记得退出
if (m_bBroken || m_bClosing)
throw CUDTException(2, 1, 0);
else if (!m_bConnected)
throw CUDTException(2, 2, 0);

if (len <= 0) //需要发送的数据长度不正确
return 0;

if (len > m_iSndBufSize * m_iPayloadSize) //如果需要发送的数据 > 发送缓冲×有效载荷,记得退出
throw CUDTException(5, 12, 0);

CGuard sendguard(m_SendLock);

if (m_pSndBuffer->getCurrBufSize() == 0) //如果当前的缓冲区中没有空间,稍微延迟一会
{
uint64_t currtime;
CTimer::rdtsc(currtime);
m_ullLastRspTime = currtime; //记录延迟计时器,避免延迟严重
}

//如果需要发送的数据 > 已经使用的空间,也就是说空间还是不够用
if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
{
if (!m_bSynSending) //如果没有设置异步发送标志,要抛出异常
throw CUDTException(6, 1, 0);
else
{
// wait here during a blocking sending
#ifndef WINDOWS
pthread_mutex_lock(&m_SendBlockLock); //拿到睡眠阻塞锁
if (m_iSndTimeOut < 0) //如果此时还没有到超时时间
{ //再次经过判断空间还是不够用,那就得睡眠一会喽
while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
}
else //如果已经超时
{
uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; 再给一次机会睡眠一会
timespec locktime;

           locktime.tv_sec = exptime / 1000000;
           locktime.tv_nsec = (exptime % 1000000) * 1000;

           while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
              pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
        }
        pthread_mutex_unlock(&m_SendBlockLock);
     #else
        if (m_iSndTimeOut < 0)
        {
           while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
              WaitForSingleObject(m_SendBlockCond, INFINITE);
        }
        else
        {
           uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;

           while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
              WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
        }
     #endif

     // check the connection status
     if (m_bBroken || m_bClosing)
        throw CUDTException(2, 1, 0);
     else if (!m_bConnected)
        throw CUDTException(2, 2, 0);
  }

}

if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) //如果Buffer还是不够用,并且已经超时,要抛出异常
{
if (m_iSndTimeOut >= 0)
throw CUDTException(6, 3, 0);

  return 0;    //空间不够用但是没有超时,返回0,告诉用户数据没有发送

}

// 记录这一次发送数据的时间
if (0 == m_pSndBuffer->getCurrBufSize()) //此时已经有了可以发送数据的空间
m_llSndDurationCounter = CTimer::getTime();

// 将用户提供的数据添加到UDT SOCKET的空间中
m_pSndBuffer->addBuffer(data, len, msttl, inorder);

// 如果这个UDT SOCKET的发送链表中,将这个UDT添加到UDP的发送链表中
m_pSndQueue->m_pSndUList->update(this, false);

//如果发送缓冲区中没有足够可用的数据,取消可以写入事件标志
if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
{
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
}

return len;
}

##int recvmsg(UDTSOCKET u, char* buf, int len)
#####recvmsg的处理流程:`int UDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(char* data, int len)`
- Order 1:`int CUDT::recvmsg(char* data, int len)`:处理UDP数据报的接收请求

int CUDT::recvmsg(char* data, int len)
{
if (UDT_STREAM == m_iSockType) //如果是TCP类型的数据报,直接退出
throw CUDTException(5, 9, 0);

//如果状态不正确,直接退出
if (!m_bConnected)
throw CUDTException(2, 2, 0);

//如果想要获取len<=0,直接返回
if (len <= 0)
return 0;

CGuard recvguard(m_RecvLock);

if (m_bBroken || m_bClosing) //如果目前处于关闭/损坏的状态
{
int res = m_pRcvBuffer->readMsg(data, len); //依旧尝试从Buffer中读取数据

  if (m_pRcvBuffer->getRcvMsgNum() <= 0)    //如果UDT Queue中已经没有数据了,取消此UDT的可读取事件
  {
     // read is not available any more
     s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
  }

  if (0 == res)    //如果读取失败,抛出异常,否则返回读取的字节数
     throw CUDTException(2, 1, 0);
  else
     return res;

}

if (!m_bSynRecving) //如果没有设置异步读取机制,没有读取到数据时要抛出异常,读到了就直接返回
{
int res = m_pRcvBuffer->readMsg(data, len);
if (0 == res)
throw CUDTException(6, 2, 0);
else
return res;
}

int res = 0; //异步读取的话可以延迟一小会
bool timeout = false;

do
{
#ifndef WINDOWS
pthread_mutex_lock(&m_RecvDataLock);

     if (m_iRcvTimeOut < 0)    //还没有到超时时间,那么小憩一会
     {
        while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
           pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
     }
     else    //已经到了超时时间,但是再给你一次机会,等待一会
     {
        uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
        timespec locktime;

        locktime.tv_sec = exptime / 1000000;
        locktime.tv_nsec = (exptime % 1000000) * 1000;

        if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
           timeout = true;

        res = m_pRcvBuffer->readMsg(data, len);    //等待一会之后,再次尝试读取数据
     }
     pthread_mutex_unlock(&m_RecvDataLock);
  #else
     if (m_iRcvTimeOut < 0)
     {
        while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
           WaitForSingleObject(m_RecvDataCond, INFINITE);
     }
     else
     {
        if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
           timeout = true;

        res = m_pRcvBuffer->readMsg(data, len);
     }
  #endif

  if (m_bBroken || m_bClosing)    //如果状态不正确,抛出异常
     throw CUDTException(2, 1, 0);
  else if (!m_bConnected)
     throw CUDTException(2, 2, 0);

} while ((0 == res) && !timeout); //今天读不到数据,我就赖着不走了

//如果接收缓冲区中确实没有数据可读,那么必须要取消关注的可读事件
if (m_pRcvBuffer->getRcvMsgNum() <= 0)
{
// read is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
}

if ((res <= 0) && (m_iRcvTimeOut >= 0)) //如果没有读到数据,还超时了,抛出异常
throw CUDTException(6, 3, 0);

return res;
}
```

猜你喜欢

转载自www.cnblogs.com/ukernel/p/9191048.html