UDT源码剖析(十一)之SendQueue And RecvQueue

SendQueue与RecvQueue在代码中与UDP SOCKET相关。在用户将想要发送的数据提交给Buffer之后,由Buffer将数据打包,根据拥塞控制提供的时间计算,在合适的时间提交给SendQueue进行发送。在接收到数据包之后,通过事件驱动的模式通知用户从RecvQueue中拿去数据包。
删除了交会连接模式的代码。
SendQueue与RecvQueue依赖于几个通用的数据结构,先列出来哈,可能会与前面的描述有重复,怕来回查找麻烦,索性全部列出来:

CMultiplexer:每个UDP端口对应一个此对象,资源的实际持有者

struct CMultiplexer
{
   CSndQueue* m_pSndQueue;  // The sending queue
   CRcvQueue* m_pRcvQueue;  // The receiving queue
   CChannel* m_pChannel;    // The UDP channel for sending and receiving
   CTimer* m_pTimer;        // The timer

   int m_iPort;         // The UDP port number of this multiplexer
   int m_iIPversion;        // IP version
   int m_iMSS;          // Maximum Segment Size
   int m_iRefCount;     //与此资源复用器相关联的UDT实例的数量
   bool m_bReusable;        //这个资源复用器是否可以被共享

   int m_iID;           // multiplexer ID
};

CUnitQueue

struct CUnit
{
   CPacket m_Packet;        // packet
   int m_iFlag;                 // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
};
class CUnitQueue
{
private:
   struct CQEntry
   {
      CUnit* m_pUnit;       // unit queue
      char* m_pBuffer;      // data buffer
      int m_iSize;      // size of each queue

      CQEntry* m_pNext;
   }
   *m_pQEntry,          // 指向起始的Entry队列
   *m_pCurrQueue,       // 指向当前的Entry队列
   *m_pLastQueue;       // 指向最后一个Entry队列

   CUnit* m_pAvailUnit;         //最近访问的Unit* 
   int m_iSize;         // 总共的Packets数量
   int m_iCount;        // 已经使用的Packets数量
   int m_iMSS;          // unit buffer size
   int m_iIPversion;        // IP version
};
  • 初始化:int CUnitQueue::init(int size, int mss, int version)
    ```
    CUnitQueue::CUnitQueue():
    m_pQEntry(NULL),
    m_pCurrQueue(NULL),
    m_pLastQueue(NULL),
    m_iSize(0),
    m_iCount(0),
    m_iMSS(),
    m_iIPversion()
    {
    }

int CUnitQueue::init(int size, int mss, int version)
{
CQEntry* tempq = NULL;
CUnit* tempu = NULL;
char* tempb = NULL;

try
{
tempq = new CQEntry; //初始化需要管理的内存
tempu = new CUnit [size];
tempb = new char [size * mss];
}
catch (...)
{
delete tempq; //出现异常后退出
delete [] tempu;
delete [] tempb;

  return -1;

}

for (int i = 0; i < size; ++ i)
{
tempu[i].m_iFlag = 0; //0表示当前CUint是空闲的
tempu[i].m_Packet.m_pcData = tempb + i * mss; //初始化packet中的数据字段,只有使用权限
}
tempq->m_pUnit = tempu; //初始化队列
tempq->m_pBuffer = tempb; //初始化队列
tempq->m_iSize = size; //初始化大小

m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; //初始化整体的队列
m_pQEntry->m_pNext = m_pQEntry; //初始化下一个指向自己

m_pAvailUnit = m_pCurrQueue->m_pUnit; //指向首个可用的CUnit

m_iSize = size; //整体的packet个数,CUint大小为0
m_iMSS = mss;
m_iIPversion = version;

return 0;
}

- 销毁:`CUnitQueue::~CUnitQueue()`

CUnitQueue::~CUnitQueue()
{
CQEntry* p = m_pQEntry; //有size条队列,获取首条队列

while (p != NULL) //释放队列等的信息
{
delete [] p->m_pUnit;
delete [] p->m_pBuffer; //这个Buffer是一个CQEntry中的首部,指向这个CQEntry中所使用Buffer的起始位置,在分配的时候是一整块分配的,删除的时候也是一整块的删除

  CQEntry* q = p;
  if (p == m_pLastQueue)
     p = NULL;
  else
     p = p->m_pNext;
  delete q;

}
}

- 增加队列长度:`int CUnitQueue::increase()`

int CUnitQueue::increase()
{
int real_count = 0;
CQEntry* p = m_pQEntry; //获取首条队列的指针
while (p != NULL)
{
CUnit* u = p->m_pUnit; //获取首条队列的首个CUnit
for (CUnit* end = u + p->m_iSize; u != end; ++ u)
if (u->m_iFlag != 0) //如果队列中的Cunit状态不是free
++ real_count; //真实的,使用中的CUnit数量++

  if (p == m_pLastQueue)
     p = NULL;
  else
     p = p->m_pNext;

}
m_iCount = real_count; //更新使用状况
if (double(m_iCount) / m_iSize < 0.9) //已经使用的数量/总数量 < 0.9
return -1; //就是说还有剩余的空间,不进行扩张

CQEntry* tempq = NULL; //创建一条新的队列,链接到末尾
CUnit* tempu = NULL;
char* tempb = NULL;

// all queues have the same size
int size = m_pQEntry->m_iSize;

try
{
tempq = new CQEntry;
tempu = new CUnit [size];
tempb = new char [size * m_iMSS];
}
catch (...)
{
delete tempq;
delete [] tempu;
delete [] tempb;

  return -1;

}

for (int i = 0; i < size; ++ i)
{
tempu[i].m_iFlag = 0;
tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
}
tempq->m_pUnit = tempu;
tempq->m_pBuffer = tempb;
tempq->m_iSize = size;

m_pLastQueue->m_pNext = tempq; //链接到末尾
m_pLastQueue = tempq;
m_pLastQueue->m_pNext = m_pQEntry; //这是一条环形链表

m_iSize += size; //增加总共的数量

return 0;
}

- 获取下一个可用的CUnit:`CUnit* CUnitQueue::getNextAvailUnit()`

CUnit* CUnitQueue::getNextAvailUnit()
{
if (m_iCount * 10 > m_iSize * 9) //如果已经使用的比例超过0.9,就增加数量
increase();

if (m_iCount >= m_iSize) //如果已经使用的数量超过总数量,就返回
return NULL;

CQEntry* entrance = m_pCurrQueue; //获取当前队列

do
{
for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
if (m_pAvailUnit->m_iFlag == 0) //在当前的队列中找到第一个free的CUnit
return m_pAvailUnit;

  if (m_pCurrQueue->m_pUnit->m_iFlag == 0)  //如果当前队列的第一个CUnit为free
  {
     m_pAvailUnit = m_pCurrQueue->m_pUnit;  //更新avail CUnit并返回
     return m_pAvailUnit;
  }

  m_pCurrQueue = m_pCurrQueue->m_pNext; //availCUint一直存在于CurrentQuene
  m_pAvailUnit = m_pCurrQueue->m_pUnit;

} while (m_pCurrQueue != entrance);

increase(); //完了立马判断是否需要增加数量

return NULL;
}


##CSndUList

struct CSNode
{
CUDT* m_pUDT; // 指向CUDT*的指针
uint64_t m_llTimeStamp; // 堆化时排序的时间戳

int m_iHeapLoc; // 堆的层次,-1意味着暂时不存在与当前堆中
};

class CSndUList
{
private:
CSNode** m_pHeap; // 堆化数组
int m_iArrayLength; // 堆数组长度
int m_iLastEntry; // 最近一次发送的位置
udt_pthread_mutex_t m_ListLock;
udt_pthread_mutex_t* m_pWindowLock;
udt_pthread_cond_t* m_pWindowCond;
CTimer* m_pTimer;
};

- 初始化:`CSndUList::CSndUList()`

CSndUList::CSndUList():
m_pHeap(NULL),
m_iArrayLength(4096), //将队列长度预设为4096
m_iLastEntry(-1),
m_ListLock(),
m_pWindowLock(NULL),
m_pWindowCond(NULL),
m_pTimer(NULL)
{
m_pHeap = new CSNode*[m_iArrayLength]; //创建length条指针队列

#ifndef WINDOWS
pthread_mutex_init(&m_ListLock, NULL);
#else
m_ListLock = CreateMutex(NULL, false, NULL);
#endif
}

- 销毁:`CSndUList::~CSndUList()`

CSndUList::~CSndUList()
{
delete [] m_pHeap; //销毁指针队列

#ifndef WINDOWS
pthread_mutex_destroy(&m_ListLock);
#else
CloseHandle(m_ListLock);
#endif
}

- 向堆中添加CUDT实例:`void CSndUList::insert(int64_t ts, const CUDT* u)`

void CSndUList::insert(int64_t ts, const CUDT* u)
{
CGuard listguard(m_ListLock); //获取锁的guard

// increase the heap array size if necessary
if (m_iLastEntry == m_iArrayLength - 1) //如果上一次使用的是最后一条,增加长度
{
CSNode** temp = NULL;

  try   //都是指针,调整起来,消耗也不是很大
  {         
     temp = new CSNode*[m_iArrayLength * 2];    //长度*2,然后将以往的都copy
  }
  catch(...)
  {
     return;
  }

  memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
  m_iArrayLength *= 2;
  delete [] m_pHeap;    //释放以前的操作
  m_pHeap = temp;

}

insert_(ts, u); //真实的insert
}

- 向堆中添加CUDT实例:`void CSndUList::insert_(int64_t ts, const CUDT* u) `

void CSndUList::insert_(int64_t ts, const CUDT* u) //真实的insert
{
CSNode* n = u->m_pSNode; //反向获取CSNode

// do not insert repeated node
//-1意味着,这个CSNode没有在堆中,可以插入.>=1意味着在堆中,直接返回
if (n->m_iHeapLoc >= 0)
return;

m_iLastEntry ++; //修改即将插入的位置
m_pHeap[m_iLastEntry] = n; //将这个指针指向的CSNode插入堆中
n->m_llTimeStamp = ts; //修改时间为ts

int q = m_iLastEntry;
int p = q;
while (p != 0) //堆化的过程,调整新插入的位置,根据ts的大小调整位置
{
p = (q - 1) >> 1;
if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) //ts越大,越往上
{
CSNode* t = m_pHeap[p];
m_pHeap[p] = m_pHeap[q];
m_pHeap[q] = t;
t->m_iHeapLoc = q;
q = p;
}
else
break;
}

n->m_iHeapLoc = q; //这个变量还表示在当前堆中的层数吗? 反正大于0,就是存在于堆中了

// an earlier event has been inserted, wake up sending worker
if (n->m_iHeapLoc == 0) //如果当前的CUDT*在最顶层,唤醒发送线程
m_pTimer->interrupt();

if (0 == m_iLastEntry) //如果队列为空,唤醒发送队列??
{
#ifndef WINDOWS
pthread_mutex_lock(m_pWindowLock);
pthread_cond_signal(m_pWindowCond);
pthread_mutex_unlock(m_pWindowLock);
#else
SetEvent(*m_pWindowCond);
#endif
}
}

- 更新CUDT的发送时间戳:`void CSndUList::update(const CUDT* u, bool reschedule) `

void CSndUList::update(const CUDT* u, bool reschedule) //更新CUDT*的发送时间戳
{
CGuard listguard(m_ListLock);

CSNode* n = u->m_pSNode; //反向获取指针

if (n->m_iHeapLoc >= 0) //>0说明存在于堆中,需要调整
{
if (!reschedule) //如果调整参数为false,直接退出
return;

  if (n->m_iHeapLoc == 0)   //如果在堆顶部,唤醒发送队列,那还重新调整吗?
  {
     n->m_llTimeStamp = 1;  
     m_pTimer->interrupt();
     return;
  }

  remove_(u);   //删除已经存在的CUDT*

}

insert_(1, u); //重新插入CUDT*
}

- 取得发送地址中下一个packet和addr,并将CUDT重新加入堆中:`int CSndUList::pop(sockaddr*& addr, CPacket& pkt)`

int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
{
CGuard listguard(m_ListLock);

if (-1 == m_iLastEntry) //索引有问题,直接返回
return -1;

// no pop until the next schedulled time
uint64_t ts;
CTimer::rdtsc(ts); //获取当前时间
if (ts < m_pHeap[0]->m_llTimeStamp) //如果发送的时间小于当前的时间(在规定的时间内没有发送出去)
return -1;

CUDT* u = m_pHeap[0]->m_pUDT; //从堆顶部获取并删除
remove_(u); //只是将flag调整为-1,不进行直接的删除

if (!u->m_bConnected || u->m_bBroken) //和CUDT中的函数挂钩了,回头调整注释
return -1;

// pack a packet from the socket
if (u->packData(pkt, ts) <= 0)
return -1;

addr = u->m_pPeerAddr;

// insert a new entry, ts is the next processing time
if (ts > 0)
insert_(ts, u); //将当前时间设置为下一次的发送时间

return 1;
}

- 获取下一次发哦少年宫的事件:`uint64_t CSndUList::getNextProcTime()`

uint64_t CSndUList::getNextProcTime()
{
CGuard listguard(m_ListLock);

if (-1 == m_iLastEntry)
return 0;

return m_pHeap[0]->m_llTimeStamp; //获取下一次的处理时间
}

- 将CUDT从堆中删除:`void CSndUList::remove_(const CUDT* u)`

void CSndUList::remove_(const CUDT* u)
{
CSNode* n = u->m_pSNode; //反向获取CUDT*在堆中的表现形式

if (n->m_iHeapLoc >= 0) //如果存在于堆中,进行删除,堆的删除操作
{
// remove the node from heap
m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
m_iLastEntry --;
m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;

  int q = n->m_iHeapLoc;
  int p = q * 2 + 1;
  while (p <= m_iLastEntry)
  {
     if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
        p ++;

     if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
     {
        CSNode* t = m_pHeap[p];
        m_pHeap[p] = m_pHeap[q];
        m_pHeap[p]->m_iHeapLoc = p;
        m_pHeap[q] = t;
        m_pHeap[q]->m_iHeapLoc = q;

        q = p;
        p = q * 2 + 1;
     }
     else
        break;
  }

  n->m_iHeapLoc = -1;   //此时,这个CSNode不存在于堆中,只调整指针位置,不进行实际的删除

}

// the only event has been deleted, wake up immediately
if (0 == m_iLastEntry) //如果队列已经empty,唤醒队列
m_pTimer->interrupt();
}

##CSndQueue:Send Queue

class CSndQueue
{
private:
static void* worker(void* param); //发送线程
udt_pthread_t m_WorkerThread;

private:
CSndUList* m_pSndUList; // 堆化的Send List
CChannel* m_pChannel; // The UDP channel for data sending
CTimer* m_pTimer; // 定时器设施

udt_pthread_mutex_t m_WindowLock;
udt_pthread_cond_t m_WindowCond;

volatile bool m_bClosing; // 发送线程是否启动
udt_pthread_cond_t m_ExitCond;
};

- 初始化:`void CSndQueue::init(CChannel* c, CTimer* t)`

CSndQueue::CSndQueue():
m_WorkerThread(),
m_pSndUList(NULL),
m_pChannel(NULL),
m_pTimer(NULL),
m_WindowLock(),
m_WindowCond(),
m_bClosing(false),
m_ExitCond()
{
#ifndef WINDOWS
pthread_cond_init(&m_WindowCond, NULL);
pthread_mutex_init(&m_WindowLock, NULL);
#else
m_WindowLock = CreateMutex(NULL, false, NULL);
m_WindowCond = CreateEvent(NULL, false, false, NULL);
m_ExitCond = CreateEvent(NULL, false, false, NULL);
#endif
}

void CSndQueue::init(CChannel* c, CTimer* t) //讲真,SendQueue和RecvQueue共用UDP SOCKET的Timer
{
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList;
m_pSndUList->m_pWindowLock = &m_WindowLock; //初始化SendList控制变量
m_pSndUList->m_pWindowCond = &m_WindowCond;
m_pSndUList->m_pTimer = m_pTimer;

#ifndef WINDOWS
if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
{ //启动工作线程
m_WorkerThread = 0;
throw CUDTException(3, 1);
}
#else
DWORD threadID;
m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
if (NULL == m_WorkerThread)
throw CUDTException(3, 1);
#endif
}

- 工作线程:

void* CSndQueue::worker(void* param) //工作线程
{
CSndQueue* self = (CSndQueue*)param; //获取处理的Queue

while (!self->m_bClosing) //控制发送线程是否继续发送
{
uint64_t ts = self->m_pSndUList->getNextProcTime(); //获得发送队列下一个将要发送的包的具体信息

  if (ts > 0)   //如果还没有到发送时间
  {
     // wait until next processing time of the first socket on the list
     uint64_t currtime;
     CTimer::rdtsc(currtime);
     if (currtime < ts) //如果当前的时间小鱼发送时间,小睡一会
        self->m_pTimer->sleepto(ts);

     // it is time to send the next pkt
     sockaddr* addr;    //已经到了发送时间
     CPacket pkt;   
     if (self->m_pSndUList->pop(addr, pkt) < 0) //从发送队列中获取包和发送地址
        continue;

     self->m_pChannel->sendto(addr, pkt);   //调用Channel的UDP的封装发送
  }
  else
  {
      //如果没有包需要发送的时候,就在这块休眠
     // wait here if there is no sockets with data to be sent
     #ifndef WINDOWS
        pthread_mutex_lock(&self->m_WindowLock);
        if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
           pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
        pthread_mutex_unlock(&self->m_WindowLock);
     #else
        WaitForSingleObject(self->m_WindowCond, INFINITE);
     #endif
  }

}

#ifndef WINDOWS
return NULL;
#else
SetEvent(self->m_ExitCond);
return 0;
#endif
}

- 发送数据包:`int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)`

int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
{
m_pChannel->sendto(addr, packet); //调用Channel发送
return packet.getLength();
}

*******************************************************************************************************************************************
##CRcvUList

struct CRNode
{
CUDT* m_pUDT; // Pointer to CUDT*
uint64_t m_llTimeStamp; // Time Stamp

CRNode* m_pPrev; // previous link
CRNode* m_pNext; // next link

bool m_bOnList; // 当前节点是否在双向链表上
};

class CRcvUList //用于接收数据的双向链表
{
public:
CRNode* m_pUList; // the head node
private:
CRNode* m_pLast; // the last node
};

- 向双向链表中插入CUDT实例:`void CRcvUList::insert(const CUDT* u)`

void CRcvUList::insert(const CUDT* u)
{
CRNode* n = u->m_pRNode; //反向获取在双向链表中的表现形式
CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间,gettimeofday()

if (NULL == m_pUList) //如果双向链表为空
{
// empty list, insert as the single node
n->m_pPrev = n->m_pNext = NULL;
m_pLast = m_pUList = n;

  return;

}

// always insert at the end for RcvUList
n->m_pPrev = m_pLast; //插入双向链表的末尾
n->m_pNext = NULL;
m_pLast->m_pNext = n;
m_pLast = n;
}

- 向双向链表中移除CUDT实例:`void CRcvUList::remove(const CUDT* u)`

void CRcvUList::remove(const CUDT* u)
{
CRNode* n = u->m_pRNode; //方向获取在链表中的表现形式

if (!n->m_bOnList) //如果不在链表中,直接返回
return;

if (NULL == n->m_pPrev) //如果需要删除的结点是首部结点
{
// n is the first node
m_pUList = n->m_pNext;
if (NULL == m_pUList)
m_pLast = NULL;
else
m_pUList->m_pPrev = NULL;
}
else //反正不是真正的删除,只是在链表中摘除,然后调整是否在链表中的标志
{
n->m_pPrev->m_pNext = n->m_pNext;
if (NULL == n->m_pNext)
{
// n is the last node
m_pLast = n->m_pPrev;
}
else
n->m_pNext->m_pPrev = n->m_pPrev;
}

n->m_pNext = n->m_pPrev = NULL;
}

- 更新CUDT在双向链表中的位置:`void CRcvUList::update(const CUDT* u)`

void CRcvUList::update(const CUDT* u)
{
CRNode* n = u->m_pRNode; //反向获取在链表中的表现形式

if (!n->m_bOnList)
return;

CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间

// if n is the last node, do not need to change
if (NULL == n->m_pNext) //如果n是末尾的结点,不调整
return;

if (NULL == n->m_pPrev) //如果n是头部结点
{
m_pUList = n->m_pNext; //将下一个结点调整为头部结点
m_pUList->m_pPrev = NULL;
}
else
{
n->m_pPrev->m_pNext = n->m_pNext; //否则在链表中剔除这个结点
n->m_pNext->m_pPrev = n->m_pPrev;
}

n->m_pPrev = m_pLast; //然后把这个结点添加到末尾
n->m_pNext = NULL;
m_pLast->m_pNext = n;
m_pLast = n;
}

##CHash

class CHash
{
private:
struct CBucket
{
int32_t m_iID; // Socket ID
CUDT* m_pUDT; // Socket instance

  CBucket* m_pNext;     // next bucket

} **m_pBucket; // list of buckets (the hash table)

int m_iHashSize; // size of hash table
};

- 初始化:`void CHash::init(int size)`

CHash::CHash():
m_pBucket(NULL),
m_iHashSize(0)
{
}

void CHash::init(int size)
{
m_pBucket = new CBucket* [size]; //创建size个HASH ENTRY

for (int i = 0; i < size; ++ i)
m_pBucket[i] = NULL; //每个HASH ENTRY指向空

m_iHashSize = size; //调整HASH SIZE
}

- 销毁:`CHash::~CHash()`

CHash::~CHash()
{
for (int i = 0; i < m_iHashSize; ++ i) //删除所有的Buckets
{
CBucket* b = m_pBucket[i];
while (NULL != b)
{
CBucket* n = b->m_pNext;
delete b;
b = n;
}
}

delete [] m_pBucket;
}

- 查找:`CUDT* CHash::lookup(int32_t id)`

CUDT* CHash::lookup(int32_t id)
{
// simple hash function (% hash table size); suitable for socket descriptors
CBucket* b = m_pBucket[id % m_iHashSize];

while (NULL != b) //寻找CUDT*
{
if (id == b->m_iID)
return b->m_pUDT;
b = b->m_pNext;
}

return NULL;
}

- 插入:`void CHash::insert(int32_t id, CUDT* u) `

void CHash::insert(int32_t id, CUDT* u) //新加入的Bucket加入到最接近数组的底部
{
CBucket* b = m_pBucket[id % m_iHashSize];

CBucket* n = new CBucket;
n->m_iID = id;
n->m_pUDT = u;
n->m_pNext = b;

m_pBucket[id % m_iHashSize] = n;
}

- 删除:`void CHash::remove(int32_t id)`

void CHash::remove(int32_t id)
{
CBucket* b = m_pBucket[id % m_iHashSize]; //找到Bucket
CBucket* p = NULL;

while (NULL != b) //在桶中的链表中删除一个结点
{
if (id == b->m_iID)
{
if (NULL == p)
m_pBucket[id % m_iHashSize] = b->m_pNext;
else
p->m_pNext = b->m_pNext;

     delete b;

     return;
  }

  p = b;
  b = b->m_pNext;

}
}

##CRcvQueue

class CRcvQueue
{
private:
static void* worker(void* param); //接收线程
udt_pthread_t m_WorkerThread;

private:
CUnitQueue m_UnitQueue; // The received packet queue(就是那个类似于Hash的组织)

CRcvUList* m_pRcvUList; // 这个List中的UDT实例准备从Queue中读取数据
CHash* m_pHash; // HASH可以加速在List中寻找UDT实例
CChannel* m_pChannel; // UDP channel for receving packets
CTimer* m_pTimer; // 与发送队列共享Timer

int m_iPayloadSize; // packet中的有效载荷

volatile bool m_bClosing; // 接收线程是否启动
udt_pthread_cond_t m_ExitCond;

private:
udt_pthread_mutex_t m_LSLock;
CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity
CRendezvousQueue* m_pRendezvousQueue; // 汇合模式中的UDT SOCKET列表

std::vector

std::map<int32_t, std::queue

- 初始化:`void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)`

CRcvQueue::CRcvQueue():
m_WorkerThread(),
m_UnitQueue(),
m_pRcvUList(NULL),
m_pHash(NULL),
m_pChannel(NULL),
m_pTimer(NULL),
m_iPayloadSize(),
m_bClosing(false),
m_ExitCond(),
m_LSLock(),
m_pListener(NULL),
m_pRendezvousQueue(NULL),
m_vNewEntry(),
m_IDLock(),
m_mBuffer(),
m_PassLock(),
m_PassCond()
{
#ifndef WINDOWS
pthread_mutex_init(&m_PassLock, NULL);
pthread_cond_init(&m_PassCond, NULL);
pthread_mutex_init(&m_LSLock, NULL);
pthread_mutex_init(&m_IDLock, NULL);
#else
m_PassLock = CreateMutex(NULL, false, NULL);
m_PassCond = CreateEvent(NULL, false, false, NULL);
m_LSLock = CreateMutex(NULL, false, NULL);
m_IDLock = CreateMutex(NULL, false, NULL);
m_ExitCond = CreateEvent(NULL, false, false, NULL);
#endif
}

void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
{
m_iPayloadSize = payload;

m_UnitQueue.init(qsize, payload, version);

m_pHash = new CHash;
m_pHash->init(hsize);

m_pChannel = cc;
m_pTimer = t;

m_pRcvUList = new CRcvUList;
m_pRendezvousQueue = new CRendezvousQueue;

#ifndef WINDOWS
if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
{ //启动接收线程
m_WorkerThread = 0;
throw CUDTException(3, 1);
}
#else
DWORD threadID;
m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
if (NULL == m_WorkerThread)
throw CUDTException(3, 1);
#endif
}

- 销毁:`CRcvQueue::~CRcvQueue()`

CRcvQueue::~CRcvQueue()
{
m_bClosing = true;

#ifndef WINDOWS
if (0 != m_WorkerThread) //终止接受线程
pthread_join(m_WorkerThread, NULL);
pthread_mutex_destroy(&m_PassLock);
pthread_cond_destroy(&m_PassCond);
pthread_mutex_destroy(&m_LSLock);
pthread_mutex_destroy(&m_IDLock);
#else
if (NULL != m_WorkerThread)
WaitForSingleObject(m_ExitCond, INFINITE);
CloseHandle(m_WorkerThread);
CloseHandle(m_PassLock);
CloseHandle(m_PassCond);
CloseHandle(m_LSLock);
CloseHandle(m_IDLock);
CloseHandle(m_ExitCond);
#endif

delete m_pRcvUList;
delete m_pHash;
delete m_pRendezvousQueue;

// 删除队列中所有没有处理的信息
for (map<int32_t, std::queue

- 发送工作线程:` void* CRcvQueue::worker(void* param)`

{
CRcvQueue* self = (CRcvQueue*)param; //反向获取接收队列对象

//准备好ID,地址之类的东西
sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr) new sockaddr_in : (sockaddr) new sockaddr_in6;
CUDT* u = NULL;
int32_t id;

while (!self->m_bClosing)
{
#ifdef NO_BUSY_WAITING
self->m_pTimer->tick();
#endif

  // check waiting list, if new socket, insert it to the list
  while (self->ifNewEntry())    //如果有新的CUDT*
  {
     CUDT* ne = self->getNewEntry();    //将新的CUDT*加入到合适的队列中
     if (NULL != ne)
     {
        self->m_pRcvUList->insert(ne);
        self->m_pHash->insert(ne->m_SocketID, ne);
     }
  }

  // 为了接收packet,获取一个可用的slot
  CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
  if (NULL == unit) //如果获取失败
  {
     // 没有足够的空间,读取这个packet后直接drop掉
     CPacket temp;
     temp.m_pcData = new char[self->m_iPayloadSize];
     temp.setLength(self->m_iPayloadSize);
     self->m_pChannel->recvfrom(addr, temp);
     delete [] temp.m_pcData;
     goto TIMER_CHECK;
  }

  unit->m_Packet.setLength(self->m_iPayloadSize);//设置packet的有效载荷

  //recv一个packet.如果返回-1,就相当于什么都没有收到 
  if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
     goto TIMER_CHECK;

  id = unit->m_Packet.m_iID;    //获取收到的packet的ID

  // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
  if (0 == id)//Connect request
  {
     if (NULL != self->m_pListener)
        self->m_pListener->listen(addr, unit->m_Packet);//通过这个CUDT*处理这个packet
     else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
     {  //同样是处理new connect
        // asynchronous connect: call connect here
        // otherwise wait for the UDT socket to retrieve this packet
        if (!u->m_bSynRecving)
           u->connect(unit->m_Packet);
        else
           self->storePkt(id, unit->m_Packet.clone());
     }
  }
  else if (id > 0)  //发送往一个socket的数据包
  {
     if (NULL != (u = self->m_pHash->lookup(id)))//找到UDT对应的CUDT*
     {
        if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
        {//对比地址
           if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
           {//如果这个CUDT*的状态正常
              if (0 == unit->m_Packet.getFlag())//如果这是一个数据包
                 u->processData(unit);  //处理数据
              else//如果这是一个控制包
                 u->processCtrl(unit->m_Packet);//处理控制信息

              u->checkTimers();//检查定时器
              self->m_pRcvUList->update(u);//将这个处理过的CUDT*插入List末尾
           }
        }
     }
     else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
     {//如果是一个新的连接状态
        if (!u->m_bSynRecving)
           u->connect(unit->m_Packet);//建立稳定的连接状态
        else
           self->storePkt(id, unit->m_Packet.clone());//连接状态还没有稳定,先存储数据,稍后处理
     }
  }

  //当drop packet时或者没有free CUnit时,跳转到这块

TIMER_CHECK:
// take care of the timing event for all UDT sockets

  uint64_t currtime;
  CTimer::rdtsc(currtime);  //获取当前的时间

  CRNode* ul = self->m_pRcvUList->m_pUList;//获取头部的CUDT*
  uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
  while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
  {
     CUDT* udt = ul->m_pUDT;    //获取CUDT*

     if (udt->m_bConnected && !udt->m_bBroken && !udt->m_bClosing)
     {//如果这个CUDT*的状态正常,更新到Recv List的后面就行了
        udt->checkTimers(); 
        self->m_pRcvUList->update(udt);
     }
     else
     {//如果这个CUDT*的状态出现差错,直接删除
        // the socket must be removed from Hash table first, then RcvUList
        self->m_pHash->remove(udt->m_SocketID);
        self->m_pRcvUList->remove(udt);
        udt->m_pRNode->m_bOnList = false;
     }

     ul = self->m_pRcvUList->m_pUList;
  }

  //还没有进入正常的连接状态的CUDT*,发送探测包
  // Check connection requests status for all sockets in the RendezvousQueue.
  self->m_pRendezvousQueue->updateConnStatus();

}

//收尾工作
if (AF_INET == self->m_UnitQueue.m_iIPversion)
delete (sockaddr_in)addr;
else
delete (sockaddr_in6
)addr;

#ifndef WINDOWS
return NULL;
#else
SetEvent(self->m_ExitCond);
return 0;
#endif
}

- 从Queue中获取一个Packet:`int CRcvQueue::recvfrom(int32_t id, CPacket& packet)`

int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
{
CGuard bufferlock(m_PassLock);

map<int32_t, std::queue

if (i == m_mBuffer.end()) //如果描述这个UDT的Packet Queue为空
{
#ifndef WINDOWS
uint64_t now = CTimer::getTime();
timespec timeout;

     timeout.tv_sec = now / 1000000 + 1;
     timeout.tv_nsec = (now % 1000000) * 1000;

     pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);//就睡一会
  #else
     ReleaseMutex(m_PassLock);
     WaitForSingleObject(m_PassCond, 1000);
     WaitForSingleObject(m_PassLock, INFINITE);
  #endif

  i = m_mBuffer.find(id);   //被唤醒后还没有packet可读的话,设置packet,并返回-1
  if (i == m_mBuffer.end())
  {
     packet.setLength(-1);
     return -1;
  }

}

// retrieve the earliest packet
CPacket* newpkt = i->second.front(); //获取一个包

if (packet.getLength() < newpkt->getLength())//如果两个包的数据量有差错,返回-1
{
packet.setLength(-1);
return -1;
}

// copy packet content
// 将首部的packet拷贝出来
memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
packet.setLength(newpkt->getLength());

delete [] newpkt->m_pcData;
delete newpkt;

// remove this message from queue,
// if no more messages left for this socket, release its data structure
i->second.pop();
if (i->second.empty()) //如果队列为空的话,删除这个队列。对应上文,如果在m_buffer中没有找到Queue,就意味着没有Packet可读
m_mBuffer.erase(i);

return packet.getLength();
}

- 设置与取消Listener:`int CRcvQueue::setListener(CUDT* u)`,`void CRcvQueue::removeListener(const CUDT* u)`

int CRcvQueue::setListener(CUDT* u)
{
CGuard lslock(m_LSLock);

if (NULL != m_pListener)
return -1;

m_pListener = u;
return 0;
}

void CRcvQueue::removeListener(const CUDT* u)
{
CGuard lslock(m_LSLock);

if (u == m_pListener)
m_pListener = NULL;
}

- 从队列中删除UDT SOCKET以及为其维护的信息:`void CRcvQueue::removeConnector(const UDTSOCKET& id)`

void CRcvQueue::removeConnector(const UDTSOCKET& id) //删除和控制包有关的所有信息,这个队列中维护着和连接相关的信息
{
m_pRendezvousQueue->remove(id);

CGuard bufferlock(m_PassLock);

map<int32_t, std::queue

- 处理连接尚未完全建立的情况:`void CRcvQueue::setNewEntry(CUDT* u) `,`bool CRcvQueue::ifNewEntry()  `,`CUDT* CRcvQueue::getNewEntry() `

void CRcvQueue::setNewEntry(CUDT* u) //加入新的CUDT*
{
CGuard listguard(m_IDLock);
m_vNewEntry.push_back(u);
}

bool CRcvQueue::ifNewEntry() //判断是否有新的entry
{
return !(m_vNewEntry.empty());
}

CUDT* CRcvQueue::getNewEntry() //获取新的Entry
{
CGuard listguard(m_IDLock);

if (m_vNewEntry.empty())
return NULL;

CUDT* u = (CUDT)(m_vNewEntry.begin());
m_vNewEntry.erase(m_vNewEntry.begin());

return u;
}
```

猜你喜欢

转载自www.cnblogs.com/ukernel/p/9191068.html