#ifndef _E5295BC9_8074_44E9_A754_2B8ADCCB3004
#define _E5295BC9_8074_44E9_A754_2B8ADCCB3004
#include "depends/incfile.h"
#include "CFunc.h"
#include "xCores.h"
#include "SysUtil.h"
namespace x2lib
{
/*************************************************************************
** Desc : 一个强大的数据缓冲器,用户通过继承实现ApiSend/ApiRecv接口,可实现多种方式的数据缓冲,如Socket(Tcp/udp),Pipe等,
** 不必关心数据发送时的数据格式,数据大小问题,也不必关心接收时数据的完整性,丢包坏包问题,
** 另外通过Listener::OnEnSend可以在发送前一刻进行自定义加密,在Listener::OnDeRecv在接收最初进行自定义解密。
** 其他特点:提供了丰富的事件机制,同步异步接收机制等。
** 发送时使用队列缓冲,接收时通过用户指定内存缓冲。
** 注意:整体架构已完成,细节还在完善中 hicker@2020-12-18 09:11:26
** Author : hicker@2017-7-29 17:50:22
*************************************************************************/
class DataCacher
{
public:
/*************************************************************************
** Desc : 拆分包结构。一般不超过64k,自带nSrcID/nDstID的好处是,当系统包含一个中转服务时,中转服务程序可以不用等待得
** 到完整包再开始转发。(转发逻辑需要用户实现,DataCacher并不因nSrcID/nDstID自动改变数据流向);
** nByID0/nByID1可以增强数据包传输能力,比如可用于实现类似CDN的业务;
** Author : hicker@2017-7-29 17:50:22
*************************************************************************/
struct TranPack
{
struct tagHead
{
int vCode[4]; // 拆分包的起始标记
int nPkID; // 完整包id,一组TranPack共享同一个id
int iNodeI; // 指示当前节点的索引[0~4]
int iNodeX[4]; // 可以用来保存数据链路中的节点id,例如可将X[0]设为nSrcID,X[3]设为nDstID,X[1],X[2]设为转发节点ID
int iSeek; // 当前TranPack::Body在完整包中的偏移
int nBytes; // 拆分包总大小Head+Body
int nTotal; // 完整包的大小(用户调用PushBack时传入的nData,同时也等于所有拆分包Body的总和)
}Head;
char Body; // 取地址即可得到pData的地址
static const int MIN = sizeof(tagHead); // 包头的大小(Head)
};
private:
// 标记TranPack的传输信息(总大小,已发送/接收大小等),仅在发送时使用
struct PackHolder
{
int iTran; // 当前TranPack已发送/接收的大小,包含TranPack::Head
int iSeek; // 当前TranPack的总大小,同TranPack::Head::iSeek,不可省略,因为Pack会参与加密
int nBytes; // 当前TranPack的总大小,同TranPack::Head::nBytes,不可省略,因为Pack会参与加密
int nTotal; // 当前完整包的总大小,同TranPack::Head::nTotal,不可省略,因为Pack会参与加密
TranPack Pack; // 会在OnEnSend/OnDeRecv参与加解密
};
public:
// 标记完整包的传输信息(总大小,已传输大小,数据存储位置等),仅在接收时使用
struct DataHolder
{
public:
DataHolder()
{
memset(this, 0, sizeof(DataHolder));
};
/*************************************************************************
** Desc : 构建
** Param : [in] iNodeI
** : [in] iNodeX
** : [in] pData 大于等于0时有效,且不会进行数据的拷贝,仅进行赋值操作
** : [in] nData 大于等于0时有效
** Return :
** Author : hicker@2017-7-29
*************************************************************************/
DataHolder(int iNodeI, const int iNodeX[4], void* pData = (void*)-1, int nData = -1)
{
Build(iNodeI, iNodeX, pData, nData);
};
/*************************************************************************
** Desc : 构建
** Param : [in] iNodeI
** : [in] iNodeX
** : [in] pData 大于等于0时有效,且不会进行数据的拷贝,仅进行赋值操作
** : [in] nData 大于等于0时有效
** Return :
** Author : hicker@2017-7-29
*************************************************************************/
void Build(int iNodeI, const int iNodeX[4], void* pData = (void*)-1, int nData = -1)
{
this->nPkID = 0;
this->iNodeI = iNodeI;
memcpy(this->iNodeX, (void*)iNodeX, sizeof(TranPack::Head.iNodeX));
if ((int)pData != -1) { this->pData = pData; }
else { this->pData = 0; };
if (nData != -1) { this->nData = nData; }
else { this->nData = 0; };
this->iTran = 0;
}
int nPkID; // 同TranPack
int iNodeI; // 同TranPack
int iNodeX[4]; // 同TranPack
void* pData; // 指向数据内存地址
int nData; // pData的大小,不包含TranPack::Head
private:
int iTran; // 当前已传输到pData的大小,包含TranPack::Head
friend class DataCacher;
static const int TX_MAX = 1024 * 1024; // 当作为已发送数据的缓存时,允许的nData最大值
};
private:
//struct QueHolder
//{
// std::queue<PackHolder*> queph;
// std::queue<DataHolder*> quedh;
//};
enum
{
QUE_TXFREE, QUE_TXING
};
public:
/*************************************************************************
** Desc : 监听器
** Author : hicker@2017-7-29 17:50:22
*************************************************************************/
class Listener
{
public:
/*************************************************************************
** Desc : 发送前的原始数据。用户自定义加密算法(仅支持逐字节加密)
** Param : [in] vData 由TrySend传入的参数
** : [in] pData
** : [in] nData
** Return :
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
virtual void OnEnSend(void* vData, void *pData, int nData)
{
SysUtil::Encrypt(pData, pData, nData, WEAK_KEY);
};
/*************************************************************************
** Desc : 发送后(拆分包)。未加密
** Param : [in] vData 由TrySend传入的参数
** : [in] pTranPack
** Return :
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
virtual void OnSended(void* vData, TranPack* pTranPack) {};
/*************************************************************************
** Desc : 发送后(完整包)。注意pData和nData为PushBack传入的值,如果已被用户释放则为野指针
** Param : [in] vData 由TrySend传入的参数
** : [in] pDataHolder 注意pData和nData为PushBack传入的值,如果已被用户释放则为野指针
** Return :
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
virtual void OnDataSended(void* vData, DataHolder* pDataHolder) {};
/*************************************************************************
** Desc : 接收后(拆分包)。
** Param : [in] vData 由TryRecv传入的参数
** : [in] pTranPack 当TranPack::iSeek==0时,用户可设置ppMemNeed
** : [in] nMemNeed 所需的内存大小
** : [out] ppMemNeed 仅当需要用户指定时为非nullptr,用来缓存欲接收的数据,最终可以通过PopFront获取,若置为nullptr则表示丢弃该包
** Return :
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
virtual void OnRecved(void* vData, TranPack* pTranPack, int nMemNeed, void** ppMemNeed) = 0;
/*************************************************************************
** Desc : 接收后(完整包)。比PopFront及时,但不建议使用(与TryRecv在同一线程,失去了缓冲的意义)
** Param : [in] vData 由TryRecv传入的参数
** : [in] pDataHolder 当用户在OnRecved设置了ppMemNeed才有效,否则为nullptr
** Return : 若返回true则会从接收队列中移除此数据包(PopFront将获取不到该数据)
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
virtual bool OnDataRecved(void* vData, DataHolder* pDataHolder) { return false; };
/*************************************************************************
** Desc : 接收后的原始数据。用户自定义解密算法(仅支持逐字节加密)
** Param : [in] vData 由TryRecv传入的参数
** : [in] pData
** : [in] nData
** Return :
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
virtual void OnDeRecv(void* vData, void *pData, int nData)
{
SysUtil::Decrypt(pData, pData, nData, WEAK_KEY);
};
};
protected:
/*************************************************************************
** Desc : 发送接口,由用户实现。比如可以用Socket[send/recv],Pipe[WriteFile/ReadFile]等实现
** Param : [in] vData 由TrySend传入的参数
** [in] pData
** [in] nData
** [in] pnByte
** Return : 是否连接正常,比如socket是否已关闭
** Author : hicker@2017-8-21 11:54:36
*************************************************************************/
virtual bool ApiSend(void* vData, void* pData, int nData, int* pnByte) = 0;
#if 0 // 示例(Socket)
{
int iRet = send(fd, (char*)pData, nData, 0);
*pnByte = iRet > 0 ? iRet : 0;
#ifdef __WIN32__
if (iRet < 1 && WSAEWOULDBLOCK != WSAGetLastError())
#elif defined __LINUX__
if ((iRet == -1 && (errno == EBADF || errno == ECONNRESET)) // 主动关闭
|| (iRet == 0 && errno == EWOULDBLOCK) // 被动关闭
|| (iRet < 0 && (errno != EWOULDBLOCK) && (errno != EINPROGRESS))) // 正在接收
#endif
{
return false;
}
return true;
}
#endif
/*************************************************************************
** Desc : 接收接口,由用户实现。比如可以用Socket[send/recv],Pipe[WriteFile/ReadFile]等实现
** Param : [in] vData 由TryRecv传入的参数
** [in] pData
** [in] nData
** [in] pnByte
** Return : 是否连接正常,比如socket是否已关闭
** Author : hicker@2017-8-21 11:54:36
*************************************************************************/
virtual bool ApiRecv(void* vData, void* pData, int nData, int* pnByte) = 0;
#if 0
{
// iRet=0 indicate socket closed,iRet<0 indicate send error
int iRet = recv(fd, (char*)pData, nData, 0);
*pnByte = iRet > 0 ? iRet : 0;
#ifdef __WIN32__
if (iRet < 1 && WSAEWOULDBLOCK != WSAGetLastError())
#elif defined __LINUX__
if ((iRet == -1 && (errno == EBADF || errno == ECONNRESET)) // 主动关闭
|| (iRet == 0 /*&& errno == EWOULDBLOCK*/) // 被动关闭
|| (iRet < 0 && (errno != EWOULDBLOCK) && (errno != EINPROGRESS))) // 正在接收
#endif
{
return false;
}
return true;
}
#endif
public:
DataCacher(Listener* pListener, int vCode[4], int nPackSize = 1024 * 32, int nPackCount = 100)
{
m_pDataHolderRxed = nullptr;
m_pListener = pListener;
m_nPackSize = nPackSize;
m_nPackCount = nPackCount;
m_DataHolderTxed.pData = malloc(DataHolder::TX_MAX);
m_pPackTxed = (TranPack*)malloc(m_nPackSize);
memcpy(m_vCode, vCode, sizeof(m_vCode));
m_nphMemPool = m_nPackSize * m_nPackCount;
m_pphMemPool = (char*)calloc(1, m_nphMemPool);
m_ndhMemPool = m_nPackSize * sizeof(DataHolder);
m_pdhMemPool = (char*)calloc(1, m_ndhMemPool);
for (int i = 0; i < m_nPackCount; i++)
{
m_mapQueTx[QUE_TXFREE].push((PackHolder*)(m_pphMemPool + i * m_nPackSize));
}
m_pMtxQueRxed = new xCores::Mutex();
m_pSigQueRxed = new xCores::Signal(0, 1);
m_pMtxQueTxFree = new xCores::Mutex();
m_pMtxQueTxing = new xCores::Mutex();
m_pMtxQueFake = new xCores::Mutex();
m_pPackRxed = (TranPack*)malloc(m_nPackSize);
m_iPackRxed = 0;
};
virtual ~DataCacher()
{
free(m_DataHolderTxed.pData);
free(m_pPackTxed);
free(m_pphMemPool);
free(m_pdhMemPool);
delete m_pMtxQueRxed;
delete m_pSigQueRxed;
delete m_pMtxQueTxFree;
delete m_pMtxQueTxing;
delete m_pMtxQueFake;
free(m_pPackRxed);
};
/*************************************************************************
** Desc : 向缓冲区写入要发送的数据,频繁失败意味着缓冲区已满
** Param : [in] iNodeI
** : [in] iNodeX[4]
** : [in] pData
** : [in] nData 要发送的数据大小
** : [in] isWait 缓冲区满的情况下是否进行循环等待(50ms)重试
** Return : -1表示伪造包(由PushFake使用),0表示QUE_TXFREE已满(连接状态不佳),其他表示生成的包ID
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
unsigned long PushBack(int iNodeI, const int iNodeX[4], const void *pData, int nData, bool isWait = true)
{
static unsigned long __id__ = 0; // -1表示伪造包(由PushFake使用),0表示QUE_TXFREE已满(连接状态不佳),其他表示生成的包ID
if (__id__ == (unsigned long)(-2)) __id__ = 0;
int nTempPack, nTempBody;
int nDivider = m_nPackSize - TranPack::MIN;
int nQuotient = nData / nDivider;
int nRemainder = nData % nDivider;
for (int i = 0; i <= nQuotient; i++)
{
m_pMtxQueTxFree->Lock();
while (m_mapQueTx[QUE_TXFREE].empty())
{
m_pMtxQueTxFree->Unlock();
if (!isWait) return 0;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
m_pMtxQueTxFree->Lock();
}
PackHolder* pPackHolder = m_mapQueTx[QUE_TXFREE].front();
if (i == 0) { ++__id__; }
m_mapQueTx[QUE_TXFREE].pop();
m_pMtxQueTxFree->Unlock();
if (i < nQuotient)
{
nTempPack = m_nPackSize;
nTempBody = m_nPackSize - TranPack::MIN;
}
else
{
nTempPack = TranPack::MIN + nRemainder;
nTempBody = nRemainder;
}
pPackHolder->iTran = 0;
pPackHolder->iSeek = i * nDivider;
pPackHolder->nBytes = nTempPack;
pPackHolder->nTotal = nData;
memcpy(pPackHolder->Pack.Head.vCode, m_vCode, sizeof(m_vCode));
pPackHolder->Pack.Head.nPkID = __id__;
pPackHolder->Pack.Head.iNodeI = iNodeI;
memcpy(pPackHolder->Pack.Head.iNodeX, (void*)iNodeX, sizeof(TranPack::Head.iNodeX));
pPackHolder->Pack.Head.nBytes = pPackHolder->nBytes;
pPackHolder->Pack.Head.nTotal = pPackHolder->nTotal;
pPackHolder->Pack.Head.iSeek = pPackHolder->iSeek;
memcpy(&pPackHolder->Pack.Body, (char*)pData + pPackHolder->iSeek, nTempBody);
m_pMtxQueTxing->Lock();
m_mapQueTx[QUE_TXING].push(pPackHolder);
m_pMtxQueTxing->Unlock();
}
return __id__;
};
/*************************************************************************
** Desc : 向缓冲区写入要发送的数据,频繁失败意味着缓冲区已满
** Param : [in] pDataHolder,会忽略nPkID
** : [in] isWait 缓冲区满的情况下是否进行循环等待(50ms)重试
** Return : -1表示伪造包(由PushFake使用),0表示QUE_TXFREE已满(连接状态不佳),其他表示生成的包ID
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
unsigned long PushBack(DataHolder* pDataHolder, bool isWait = true)
{
return PushBack(pDataHolder->iNodeI, pDataHolder->iNodeX, pDataHolder->pData, pDataHolder->nData,isWait);
};
/*************************************************************************
** Desc : 发送一个伪造包,跳过发送队列,直接在TrySend线程触发Listener::OnDataSended,接着在TryRecv线程触发Listener::OnDataRecved;
** : 随后根据Listener::OnDataRecved返回值确定是否进入PopFront队列
** Param : [in] pDataHolder,会将nPkID强制修改为-1,将iTran强制改为0
** Return : 固定为-1
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
unsigned long InsertFake(int iNodeI, const int iNodeX[4], void *pData, int nData)
{
DataHolder* pDataHolder = new DataHolder(iNodeI, iNodeX, pData, nData);
pDataHolder->nPkID = -1;
pDataHolder->iTran = 0;
m_pMtxQueFake->Lock();
m_vecFake.push_back(pDataHolder);
m_pMtxQueFake->Unlock();
return pDataHolder->nPkID;
}
/*************************************************************************
** Desc : 阻塞式从缓冲区获取已接收的数据,可设置超时
** : [in] ulWaitms 等待时长,设置为-1时会完全阻塞,直到有新数据进来(无视连接状态)
** Return : 获取到的数据,也即用户在OnRecved指定的(*ppMemNeed),如果是堆内存用完记得释放哦
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
DataHolder* PopFront(unsigned long ulWaitms = 0)
{
if (m_pSigQueRxed->Wait(ulWaitms))
return nullptr;
m_pMtxQueRxed->Lock();
DataHolder* pDataHolder = m_queRxed.front();
m_queRxed.pop();
m_pMtxQueRxed->Unlock();
return pDataHolder;
};
public:
/*************************************************************************
** Desc : 调用ApiSend发送数据
** Param : [in] vData 会直接传给ApiSend
** Return : 0表示正常,-1表示断开连接(ApiSend返回false),-2表示发送队列为空
** Author : hicker@2017-8-21 17:35:00
*************************************************************************/
// 发送顺序为:只从QUE取数据发送,当得到一个QUE_FREE后,由PushBack向QUE_FREE添加数据
int TrySend(void* vData/*unsigned long ulWaitms = 0*/)
{
// 处理伪数据
DataHolder *pDataHolder = nullptr;
m_pMtxQueFake->Lock();
for (auto& it : m_vecFake)
{
if (it->iTran == 0)
{
it->iTran = it->nData + TranPack::MIN; // 假裝已发送完成
pDataHolder = it;
break;
}
}
m_pMtxQueFake->Unlock();
if (pDataHolder)
{
m_pListener->OnDataSended(vData, pDataHolder);
return 0; // 当存在伪数据时,无视连接状态,一律返回正常
}
// 处理真实数据
m_pMtxQueTxing->Lock();
if (m_mapQueTx[QUE_TXING].size() < 1)
{
m_pMtxQueTxing->Unlock();
return -2;
}
PackHolder* pPackHolder = m_mapQueTx[QUE_TXING].front();
m_pMtxQueTxing->Unlock();
if (pPackHolder->iTran == 0)
{ // 一个新的TranPack开始发送,备份到m_DataHolderTxed
memcpy(m_pPackTxed, &pPackHolder->Pack, pPackHolder->nBytes); // 缓存,之后在OnSended使用
m_DataHolderTxed.Build(pPackHolder->Pack.Head.iNodeI, pPackHolder->Pack.Head.iNodeX, m_DataHolderTxed.pData, (pPackHolder->nTotal > DataHolder::TX_MAX ? DataHolder::TX_MAX : pPackHolder->nTotal));
memcpy((char*)m_DataHolderTxed.pData + pPackHolder->iSeek, &pPackHolder->Pack.Body, pPackHolder->nBytes - TranPack::MIN); // 缓存,之后在OnDataSended使用
// 通知用户可以进行加密
if (m_pListener) m_pListener->OnEnSend(vData, &pPackHolder->Pack, pPackHolder->nBytes);
}
int nFill = 0;
if (!ApiSend(vData, &pPackHolder->Pack + pPackHolder->iTran, pPackHolder->nBytes - pPackHolder->iTran, &nFill))
return -1;
pPackHolder->iTran += nFill;
m_DataHolderTxed.iTran += nFill;
if (pPackHolder->iTran == pPackHolder->nBytes)
{ // 一个TranPack发送完成
if (m_pListener)
{
m_pListener->OnSended(vData, m_pPackTxed);
// 一个完整包发送完成
if (pPackHolder->nTotal == pPackHolder->iSeek + (pPackHolder->nBytes - TranPack::MIN)) { m_pListener->OnDataSended(vData, &m_DataHolderTxed); }
}
m_pMtxQueTxing->Lock();
m_mapQueTx[QUE_TXING].pop();
m_pMtxQueTxing->Unlock();
m_pMtxQueTxFree->Lock();
m_mapQueTx[QUE_TXFREE].push(pPackHolder);
m_pMtxQueTxFree->Unlock();
}
return 0;
};
/*************************************************************************
** Desc : 调用ApiRecv接收数据
** Param : [in] vData 会直接传给ApiRecv
** Return : 0正常,-1表示断开连接,-2表示暂无数据
** Author : hicker@2017-8-21 17:35:18
*************************************************************************/
int TryRecv(void* vData/*int iOverflowMode*/)
{
// 处理伪数据
DataHolder *pDataHolder = nullptr;
m_pMtxQueFake->Lock();
for(std::vector<DataHolder*>::iterator it = m_vecFake.begin();it!=m_vecFake.end();++it)
{
if ((*it)->iTran == (*it)->nData + TranPack::MIN)
{
pDataHolder = (*it);
m_vecFake.erase(it);
break;
}
}
m_pMtxQueFake->Unlock();
if (pDataHolder)
{
if (!m_pListener->OnDataRecved(vData, pDataHolder))
{
m_pMtxQueRxed->Lock();
m_queRxed.push(pDataHolder);
m_pMtxQueRxed->Unlock();
m_pSigQueRxed->Notify(1);
}
return 0; // 当存在伪数据时,无视连接状态,一律返回正常
}
// 处理真实数据
int nRecved = 0;
if (!ApiRecv(vData, (char*)m_pPackRxed + m_iPackRxed, m_nPackSize - m_iPackRxed, &nRecved))
return -1;
if (nRecved == 0)
return -2;
m_iPackRxed += nRecved;
if (m_pListener) m_pListener->OnDeRecv(vData, m_pPackRxed, nRecved);
char* pvCode = (char*)MemStrX(m_pPackRxed, m_vCode, m_iPackRxed, sizeof(m_vCode));
if (m_iPackRxed < TranPack::MIN)
{ // 缓存的数据长度不足以找到TranPack::Head
return 0;
}
if (pvCode == nullptr && m_iPackRxed == m_nPackSize)
{ // 已缓存到定义的最大包长度仍未找到vCode,说明收到了不明数据或包被损坏,舍去首个字节,继续下一次接收
--m_iPackRxed;
memmove(m_pPackRxed, (char*)m_pPackRxed + 1, m_iPackRxed);
return 0;
}
if (pvCode != (char*)m_pPackRxed)
{ // 若vCode没有位于首字节,只有一种可能(基于后续流程保证每收到一个完整包,通知完使用者后,将余下数据移动到m_pPackRxing起始地址),即vCode之前的为垃圾数据或坏包。
memmove(m_pPackRxed, pvCode, m_iPackRxed - (pvCode - (char*)m_pPackRxed));
}
// 经过以上排错,m_pPackRxing的起始地址一定是vCode,且m_iPackRxing至少达到TranPack::MIN
while (m_iPackRxed >= TranPack::MIN && m_pPackRxed->Head.nBytes <= m_iPackRxed)
{ // 至少得到一个完整包
if (m_pListener)
{ // 通知Listener,仅在完整包的首个拆分包到来时,为用户提供提交内存的机会
if (m_pPackRxed->Head.iSeek == 0) m_pListener->OnRecved(vData, m_pPackRxed, m_pPackRxed->Head.nTotal + sizeof(DataHolder), (void**)&m_pDataHolderRxed);
else m_pListener->OnRecved(vData, m_pPackRxed, 0, nullptr);
if (m_pDataHolderRxed != nullptr)
{
if (m_pPackRxed->Head.iSeek == 0)
{
m_pDataHolderRxed->Build(m_pPackRxed->Head.iNodeI, m_pPackRxed->Head.iNodeX, (char*)m_pDataHolderRxed + sizeof(DataHolder), m_pPackRxed->Head.nTotal);
m_pDataHolderRxed->nPkID = m_pPackRxed->Head.nPkID;
m_pDataHolderRxed->iTran = TranPack::MIN;
}
memcpy((char*)m_pDataHolderRxed->pData + m_pDataHolderRxed->iTran, &m_pPackRxed->Body, m_pPackRxed->Head.nBytes);
m_pDataHolderRxed->iTran += m_pPackRxed->Head.nBytes - TranPack::MIN;
}
if (m_pPackRxed->Head.iSeek + (m_pPackRxed->Head.nBytes - TranPack::MIN) == m_pPackRxed->Head.nTotal)
{ // 得到完整包
if (m_pListener->OnDataRecved(vData, m_pDataHolderRxed))
{
m_pDataHolderRxed = nullptr; // 遵循被OnDataRecved后就不再被PopFront处理原则:使得不添加到m_queRxed
}
}
}
if (m_pDataHolderRxed != nullptr)
{
if (m_pDataHolderRxed->iTran == m_pPackRxed->Head.nTotal + TranPack::MIN)
{ // 通知PopFront
m_pMtxQueRxed->Lock();
m_queRxed.push(m_pDataHolderRxed);
m_pMtxQueRxed->Unlock();
m_pSigQueRxed->Notify(1);
m_pDataHolderRxed = nullptr;
}
}
// 迁移剩余数据到m_pPackRxing首地址
m_iPackRxed -= m_pPackRxed->Head.nBytes;
if (m_iPackRxed > 0) { memmove(m_pPackRxed, (char*)m_pPackRxed + m_pPackRxed->Head.nBytes, m_iPackRxed); }
}
return 0;
};
//bool PackHead(void *pData, int nData) { return true; };
//bool PackTail(void *pData, int nData) { return true; };
private:
Listener *m_pListener;
static const unsigned char WEAK_KEY = 0x12;
int m_nPackSize;
int m_nPackCount;
int m_vCode[4];
int m_nphMemPool; // PackHolder内存池
char *m_pphMemPool;
int m_ndhMemPool; // DataHolder内存池
char *m_pdhMemPool;
TranPack* m_pPackTxed;
TranPack* m_pPackRxed; // 大小为m_nPackSize
int m_iPackRxed;
DataHolder* m_pDataHolderRxed; // 已接收到的完整包
DataHolder m_DataHolderTxed; // 已发送的包,最大缓存10*m_nPackSize
std::queue<DataHolder*> m_queRxed;
xCores::Mutex* m_pMtxQueRxed;
xCores::Signal* m_pSigQueRxed;
// linux error: '>>' should be '> >' within a nested template argument list
std::map<int, std::queue<PackHolder*>> m_mapQueTx; // QUE_TXFREE, QUE_TXING,
//struct QueHolder
//{
// std::queue<PackHolder*> queph;
// std::queue<DataHolder*> quedh;
//};
xCores::Mutex* m_pMtxQueTxFree;
xCores::Mutex* m_pMtxQueTxing;
std::vector<DataHolder*> m_vecFake; // 通过InsertFake添加的数据
xCores::Mutex* m_pMtxQueFake;
};
}
#endif
DataCacher——一个数据缓冲器,支持多种底层传输协议(Socket、管道等)
猜你喜欢
转载自blog.csdn.net/xk641018299/article/details/111352690
今日推荐
周排行