#define CCONMAG_H
#include <string>
#include <vector>
#include <exception>
#include <typeinfo>
#include <deque>
using namespace std;
#include "pageset.h"
#include "taskqueueproc.h"
#include "baselog.h"
#include "connection.h"
#include "transctrllayer.h"
#include "comapi.h"
#include "pack.h"
#include "DateTime.h"
#include <assert.h>
using namespace BackCom;
namespace NetIO
{
class CConMagBase
{
public:
enum LogType
{
Log_RequestInfo = 1,
Log_RequestPack = 2,
Log_ResponseInfo = 4,
Log_ResponsePack = 8,
Log_Debug = 16,
};
struct _Trans_Param
{
_Trans_Param()
{
m_intShakeHandInterval = 0x7fffffff;
m_intShakeHandMode = 0;
}
int m_intShakeHandInterval;
int m_intShakeHandMode;
};
enum Con_Status
{
Con_NoError
,Con_ClientClosed
,Con_ShakeHandTimeout
,Con_ShakeHandFail
,Con_OutputQueueFull
,Con_NetError
,Con_ByAdmin
,Con_RequestPacketInvalid
,Con_RecvPacketOverFlow
,Con_UnknownError
};
enum Packet_Priority
{
Priority_Low=0
,Priority_Normal=1
,Priority_High=2
};
public:
virtual bool StartThreads() = 0;
virtual bool StopThreads() = 0;
virtual void SetPackHead(CPackProtocol * pHead) = 0;
virtual bool SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
) = 0;
virtual bool SendPriorityPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
) = 0;
virtual bool AddClosingConn(const unsigned long long int intSerialNo
,const Con_Status enuCloseReason = Con_UnknownError
,const int intDelay = 0
) = 0;
virtual bool CloseAllConns(const Con_Status enuCloseReason = Con_UnknownError
,const int intDelay = 0
) = 0;
public:
virtual void ConnCloseNotify(const unsigned long long int intSerialNo
,const Con_Status enuReason
) = 0;
virtual bool NewPacketNotify(const unsigned long long int intSerialNo
,const string & strID
,const char * pchPacket
,const int intPacketSize
) = 0;
virtual void NewConnNotify(const unsigned long long int intSerialNo
) = 0;
virtual void RequestCanceledNotify(const unsigned long long int intSerialNo
,const int intReqPacketNo
) = 0;
};
class CConMag : private CThread,protected CBaseLog, public CConMagBase
{
public:
typedef CPageSet::_Page _Page;
struct _TaskInfo
{
vector<char> m_packet;
CConMag *m_pMag;
string m_strID;
};
class CConTask: public CTaskQueueProc<unsigned long long int,unsigned long long int ,CConMag::_TaskInfo>
{
public:
bool DoTask(const unsigned long long int & group
,const unsigned long long int & sub_id
,const CConMag::_TaskInfo & info
);
void UndoTask(const unsigned long long int & group
,const unsigned long long int & sub_id
,const CConMag::_TaskInfo & info
);
};
struct _AddPacket
{
unsigned long long int m_intSerialNo;
Packet_Priority m_pri;
_Page * m_packet;
_AddPacket()
:m_packet(NULL)
{
}
};
struct _CloseInfo
{
unsigned long long int m_intSerialNo;
Con_Status m_enuCloseReason;
bool operator<(const _CloseInfo & v) const { return m_intSerialNo < v.m_intSerialNo; };
};
private:
struct _ConnNode
{
_ConnNode(CPageSet & poolOutput
,const unsigned long long int intSerialNo
,const string & strID
,const string & strPublicKey
)
: m_conn(poolOutput,intSerialNo,strID)
,m_strPublicKey(strPublicKey)
,m_blnPaused(false)
{
};
CConnection m_conn;
string m_strPublicKey;
bool m_blnPaused;
CComDateTime m_pauseStartTime;
};
private:
struct _ConnExt
{
_ConnNode m_node;
int m_intSocket;
vector<char> m_vectInputBuf;
int m_intInputFinished;
int m_intInputPackSize;
_Page * m_pOutputPagedBuf;
int m_intOutputFinished;
int m_intOutputPackSize;
CComDateTime m_lastRecvTime;
_ConnExt(CPageSet & poolOutput
,const unsigned long long int intSerialNo
,const string & strID
,const string & strPublicKey
)
: m_node(poolOutput, intSerialNo, strID, strPublicKey)
{
m_intSocket = 0;
m_intInputFinished = 0;
m_intInputPackSize = 0;
m_pOutputPagedBuf = NULL;
m_intOutputFinished = 0;
m_intOutputPackSize = 0;
m_lastRecvTime = CComDateTime::GetCurrentTime();
};
~_ConnExt()
{
Clear();
}
void Clear()
{
m_intInputFinished = 0;
m_intInputPackSize = 0;
if( m_pOutputPagedBuf != NULL )
{
m_node.m_conn.FreeOutputNode(m_pOutputPagedBuf);
}
m_pOutputPagedBuf = NULL;
m_intOutputFinished = 0;
m_intOutputPackSize = 0;
m_lastRecvTime = CComDateTime::GetCurrentTime();
m_node.m_conn.ReleasePage();
if( m_intSocket > 0 )
{
CComApi::Close(m_intSocket);
m_intSocket = -1;
}
}
};
typedef CConTask task_queue_type;
typedef multimap<time_t,_CloseInfo > closing_type;
typedef map<unsigned long long int,_ConnExt*> all_type;
public:
CConMag();
~CConMag();
bool Init(const _Trans_Param & udtTransLayerParam
,CPackProtocol * pHead
,CLog * pLog = NULL
,const int intMaxConnection = 8192
,const int intTaskQueueThreadNum = 10
,const int intMinSendSize = 32*1024
,const int intPageSize = 4 * 1024
,const int intRecvBufInitSize = 4 * 1024
,const int intRecvBufMaxSize = 1024*1024*4
,const int intPollTimeout = 10
,const int intMaxWaitingTask = 100
,const int intMaxOutQueuePages = 1000
,const int intPauseTimeout = 300
);
bool StartThreads();
bool StopThreads();
void SetPackHead(CPackProtocol * pHead);
bool AddConn(const unsigned long long int intSerialNo
,const int intSocket
,const string strID = ""
,const string strPublicKey = ""
);
bool SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
);
bool SendPriorityPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
);
bool AddClosingConn(const unsigned long long int intSerialNo
,const Con_Status enuCloseReason = Con_UnknownError
,const int intDelay = 0
);
bool CloseAllConns(const Con_Status enuCloseReason = Con_UnknownError
,const int intDelay = 0
);
int GetFreeConCount();
void SetLogConfig(const int intFlag
,const int intMaxContentLength
);
private:
int GetConnections() const;
int GetMaxConnections() const;
bool PauseProcessRequest(const unsigned long long int intSerialNo
);
bool ResumeProcessRequest(const unsigned long long int intSerialNo
);
void StaticStart();
void StaticStop ();
virtual void Execute(void *ptr);
Con_Status DoRecv(_ConnExt * pExtConn);
Con_Status DoSend(_ConnExt * pExtConn);
int _DoSend(_ConnExt * pExtConn);
bool _SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
);
//bool Log(CLog::Level enuLevel, const char * pFunc, const char *pFormat,...);
public:
bool DoTask(const unsigned long long int & intSerialNo
,const unsigned int & intReqPacketNo
,const _TaskInfo & info
);
void UndoTask(const unsigned long long int & intSerialNo
,const unsigned int & intReqPacketNo
,const _TaskInfo & info
);
public:
const int m_intLogFlag;
const int m_intLogMaxContentLength;
const bool m_blnDoStatic;
const _Trans_Param m_udtTransLayerParam;
const int m_intMaxConnection;
const int m_intPollTimeout;
const int m_intMaxWaitingTask;
const int m_intMaxOutQueuePages;
const int m_intPauseTimeout;
const int m_intRecvBufInitSize;
const int m_intRecvBufMaxSize;
const int m_intMinSendSize;
private:
CPageSet m_pageset;
task_queue_type m_tqTasks;
CLock m_lockClosing;
closing_type m_mapClosing;
CLock m_lockNew;
vector<_ConnExt*> m_vectNew;
CLock m_lockFree;
int m_intFreeCount;
CLock m_lockAll;
all_type m_mapAll;
CPackProtocol * m_pHead;
CLock m_lockPacket;
deque<_AddPacket> m_listAddPacket;
};
class CConMagGroup:public CConMagBase, protected CBaseLog
{
private:
class CConMagSub:public CConMag
{
private:
CConMagGroup * m_pGroup;
public:
CConMagSub(CConMagGroup * pGroup)
:m_pGroup(pGroup)
{
}
virtual void ConnCloseNotify(const unsigned long long int intSerialNo
,const Con_Status enuReason
)
{
m_pGroup->ConnCloseNotify(intSerialNo, enuReason);
}
virtual bool NewPacketNotify(const unsigned long long int intSerialNo
,const string & strID
,const char * pchPacket
,const int intPacketSize
)
{
return m_pGroup->NewPacketNotify(intSerialNo, strID, pchPacket, intPacketSize);
}
virtual void NewConnNotify(const unsigned long long int intSerialNo
)
{
m_pGroup->NewConnNotify(intSerialNo);
}
virtual void RequestCanceledNotify(const unsigned long long int intSerialNo
,const int intReqPacketNo
)
{
m_pGroup->RequestCanceledNotify(intSerialNo, intReqPacketNo);
}
};
vector<CConMag *> m_vectObjs;
CLock m_lock;
unsigned long long int m_intSerialNo;
public:
CConMagGroup()
:m_intSerialNo(0)
{
}
bool Init(const _Trans_Param & udtTransLayerParam
,CPackProtocol * pHead
,const unsigned char cGroupCount = 2
,const int intMaxGroupConnection = 8192
,CLog * pLog = CLog::s_pLog
,const int intTaskQueueThreadNum = 10
,const int intMinSendSize = 32*1024
,const int intPageSize = 4 * 1024
,const int intRecvBufInitSize = 4 * 1024
,const int intRecvBufMaxSize = 1024*1024*4
,const int intPollTimeout = 10
,const int intMaxWaitingTask = 100
,const int intMaxOutQueuePages = 100
);
bool StartThreads();
bool StopThreads();
void SetPackHead(CPackProtocol * pHead);
bool AddConn(unsigned long long int & intSerialNo
,const int intSocket
,const string strID = ""
,const string strPublicKey = ""
);
bool SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
);
bool SendPriorityPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
);
bool AddClosingConn(const unsigned long long int intSerialNo
,const Con_Status enuCloseReason = Con_UnknownError
,const int intDelay = 0
);
bool CloseAllConns(const Con_Status enuCloseReason = Con_UnknownError
,const int intDelay = 0
);
int GetActiveConCount();
void SetLogConfig(const int intFlag
,const int intMaxContentLength
);
public:
virtual void ConnCloseNotify(const unsigned long long int intSerialNo
,const Con_Status enuReason
)=0;
virtual bool NewPacketNotify(const unsigned long long int intSerialNo
,const string & strID
,const char * pchPacket
,const int intPacketSize
)=0;
private:
virtual void NewConnNotify(const unsigned long long int intSerialNo
);
virtual void RequestCanceledNotify(const unsigned long long int intSerialNo
,const int intReqPacketNo
) ;
unsigned long long int GetNewSerialNo();
unsigned char GetGroupID(const unsigned long long int intSerialNo);
unsigned long long int CreateGroupSerialNo(const unsigned long long int intSerialNo, const unsigned char cGroupID);
//bool Log(CLog::Level enuLevel, const char * pFunc, const char *pFormat,...);
};
};
#endif
#include "conmag.h"
using namespace NetIO;
bool CConMag::Init(const _Trans_Param & udtTransLayerParam
,CPackProtocol * pHead
,CLog * pLog
,const int intMaxConnection
,const int intTaskQueueThreadNum
,const int intMinSendSize
,const int intPageSize
,const int intRecvBufInitSize
,const int intRecvBufMaxSize
,const int intPollTimeout
,const int intMaxWaitingTask
,const int intMaxOutQueuePages
,const int intPauseTimeout
)
{
CBaseLog::Init(pLog);
m_tqTasks.Init(intTaskQueueThreadNum,pLog);
const_cast<int&>(m_intRecvBufInitSize) = intRecvBufInitSize;
const_cast<int&>(m_intRecvBufMaxSize) = intRecvBufMaxSize;
const_cast<int&>(m_intMinSendSize) = intMinSendSize;
const_cast<int&>(m_intMaxConnection) = intMaxConnection;
const_cast<_Trans_Param&>(m_udtTransLayerParam) = udtTransLayerParam;
const_cast<int&>(m_intPollTimeout) = intPollTimeout;
const_cast<int&>(m_intMaxWaitingTask) = intMaxWaitingTask;
const_cast<int&>(m_intMaxOutQueuePages) = intMaxOutQueuePages;
const_cast<int&>(m_intPauseTimeout) = intPauseTimeout;
m_pHead = pHead;
return true;
}
CConMag::Con_Status
CConMag::DoRecv(_ConnExt * pExtConn)
{
if( pExtConn->m_vectInputBuf.size() == 0 )
{
pExtConn->m_intInputFinished = 0;
pExtConn->m_intInputPackSize = 0;
pExtConn->m_vectInputBuf.resize(m_intRecvBufInitSize);
}
int intBufSize =pExtConn->m_vectInputBuf.size();
int length = intBufSize-pExtConn->m_intInputFinished;
if( CComApi::Recv(pExtConn->m_intSocket,&*pExtConn->m_vectInputBuf.begin()+pExtConn->m_intInputFinished,length,0) == false )//Ö»½ÓÒ»´Î?
{
Log(CLog::Error,__func__,"Recv(pExtConn->m_intSocket,&*pExtConn->m_vectInputBuf.begin()+pExtConn->m_intInputFinished,length,0) == false{pExtConn->m_intSocket=%d,pExtConn->m_intInputFinished=%d,length=%d,id='%s'}",pExtConn->m_intSocket,pExtConn->m_intInputFinished,length,pExtConn->m_node.m_conn.m_strID.c_str());
return Con_NetError;
}
else if( length == 0 )
{
Log(CLog::Info,__func__,"Recv length == 0,Socket is closed by client!{pExtConn->m_intSocket=%d,length=%d,id='%s'}",pExtConn->m_intSocket,length,pExtConn->m_node.m_conn.m_strID.c_str());
return Con_ClientClosed;
}
pExtConn->m_intInputFinished += length;
int intHeadSize = m_pHead->GetHeadSize(&*pExtConn->m_vectInputBuf.begin(), pExtConn->m_intInputFinished);
while(intHeadSize > 0 && pExtConn->m_intInputFinished >= intHeadSize )
{
if( pExtConn->m_intInputPackSize == 0 )
{
pExtConn->m_intInputPackSize = m_pHead->GetPackSize(&*pExtConn->m_vectInputBuf.begin(), pExtConn->m_intInputFinished);
if(pExtConn->m_intInputPackSize < 0 || pExtConn->m_intInputPackSize > m_intRecvBufMaxSize)
{
Log(CLog::Error,__func__,"½ÓÊÕÊý¾Ý°ü´óСԽ½ç{pExtConn->m_intInputPackSize=%d,m_intRecvBufMaxSize=%d,id='%s'}",pExtConn->m_intInputPackSize,m_intRecvBufMaxSize,pExtConn->m_node.m_conn.m_strID.c_str());
DumpBuf(CLog::Error,__func__,"½ÓÊÕÊý¾Ý°ü´óСԽ½ç",&*pExtConn->m_vectInputBuf.begin(),pExtConn->m_intInputFinished,64);
return Con_RecvPacketOverFlow;
}
if(pExtConn->m_intInputPackSize > pExtConn->m_vectInputBuf.size())
{
pExtConn->m_vectInputBuf.resize(pExtConn->m_intInputPackSize);
}
}
if( pExtConn->m_intInputPackSize < intHeadSize )
{
Log(CLog::Error,__func__,"Packet less than packet head size[pExtConn->m_intInputPackSize < _PackHeadSize]!{pExtConn->m_intInputPackSize=%d,_PackHeadSize=%d,id='%s'}",pExtConn->m_intInputPackSize,intHeadSize,pExtConn->m_node.m_conn.m_strID.c_str());
DumpBuf(CLog::Error,__func__,"Error packet head content",&*pExtConn->m_vectInputBuf.begin(),pExtConn->m_intInputFinished,64);
return Con_UnknownError;
}
if( pExtConn->m_intInputFinished < pExtConn->m_intInputPackSize )
{
break;
}
char *pchPack = &*pExtConn->m_vectInputBuf.begin();
int intPackSize = pExtConn->m_intInputPackSize;
if( pExtConn->m_node.m_conn.m_blnDoStatic == true )
{
CAutoLock guard(pExtConn->m_node.m_conn.m_lockStaticInfo);
pExtConn->m_node.m_conn.m_vectStaticInfo.push_back(CConnection::_StaticInfo());
CConnection::_StaticInfo & info = pExtConn->m_node.m_conn.m_vectStaticInfo.back();
info.m_dtmTime = CDateTime::DateTimeCnvt(CComDateTime::GetCurrentTime());
timeval sNow;
gettimeofday(&sNow,NULL);
info.m_intMicroseconds = sNow.tv_usec;
info.m_blnIsRecv = true;
memcpy(info.m_pchHead,pchPack,min((int)sizeof(info.m_pchHead),intPackSize));
}
unsigned long long int lltPacketNo = m_pHead->GetPackNo(pchPack);
short shType = m_pHead->GetPackType(pchPack);
if( (m_intLogFlag & Log_RequestInfo) == Log_RequestInfo )
Log(CLog::Info,__func__,"Receive a packet!{intPackSize=%d,shType=0x%x,id='%s'}",pExtConn->m_intInputPackSize,shType,pExtConn->m_node.m_conn.m_strID.c_str());
if( (m_intLogFlag & Log_RequestPack) == Log_RequestPack )
DumpBuf(CLog::Info,__func__,"Packet Content",pchPack,intPackSize,m_intLogMaxContentLength);
pExtConn->m_lastRecvTime = CComDateTime::GetCurrentTime();
if(m_udtTransLayerParam.m_intShakeHandMode && m_pHead->IsShakeHandPacket(pchPack, intPackSize))
{
vector<char> vectResp;
vectResp = m_pHead->GetShakeHandResponse(pchPack, intPackSize);
if(vectResp.size())
{
pExtConn->m_node.m_conn.AddOutputPack(&*vectResp.begin(), vectResp.size(), Priority_High);//»Ø¸´°ü
}
}
else
{
_TaskInfo info;
info.m_packet.insert(info.m_packet.end(), pchPack, pchPack + intPackSize);
info.m_pMag = this;
info.m_strID = pExtConn->m_node.m_conn.m_strID;
if( (m_intLogFlag & Log_RequestInfo) == Log_RequestInfo )
Log(CLog::Info,__func__,"Ìá½»ÇëÇóÈÎÎñ£¡{SerialNo=%d,info=(m_pPagedBuf=%p,m_pMag=%p),id='%s'}",pExtConn->m_node.m_conn.m_intSerialNo,&*info.m_packet.begin(),info.m_pMag,pExtConn->m_node.m_conn.m_strID.c_str());
m_tqTasks.SubmitTask(pExtConn->m_node.m_conn.m_intSerialNo,lltPacketNo,info);
}
if( pExtConn->m_intInputFinished > pExtConn->m_intInputPackSize )
{
if( (m_intLogFlag & Log_RequestInfo) == Log_RequestInfo )
Log(CLog::Info,__func__,"Recv more than one pack,need move left data to buffer head[pExtConn->m_intInputFinished > pExtConn->m_intInputPackSize]!{pExtConn->m_intInputFinished=%d,pExtConn->m_intInputPackSize=%d,id='%s')}",pExtConn->m_intInputFinished,pExtConn->m_intInputPackSize,pExtConn->m_node.m_conn.m_strID.c_str());
memmove(pchPack,pchPack+pExtConn->m_intInputPackSize,pExtConn->m_intInputFinished-pExtConn->m_intInputPackSize);
pExtConn->m_intInputFinished -= pExtConn->m_intInputPackSize;
pExtConn->m_intInputPackSize = 0;
}
else
{
pExtConn->m_intInputFinished = 0;
pExtConn->m_intInputPackSize = 0;
return Con_NoError;
}
}
return Con_NoError;
}
CConMag::Con_Status CConMag::DoSend(_ConnExt * pExtConn)
{
int intSumSize = 0;
while(m_intMinSendSize > intSumSize)
{
int intSendSize = _DoSend(pExtConn);
if(-1 == intSendSize)
{
return Con_NetError;
}
else if(-2 == intSendSize || 0 == intSendSize)
{//·¢ËÍ»º³åÇøÒÑÂú»òÕßûÓÐÊý¾Ý¿É·¢ËÍ
break;
}
else
{
intSumSize += intSendSize;
}
}
//Log(CLog::Warn,__func__,"Send data sum size = %d", intSumSize);
return Con_NoError;
}
int CConMag::_DoSend(_ConnExt * pExtConn)
{
if( pExtConn->m_pOutputPagedBuf == NULL )
{
if( pExtConn->m_node.m_conn.GetOutputNode(pExtConn->m_pOutputPagedBuf) == false )
{
Log(CLog::Error,__func__,"pExtConn->m_node.m_conn.GetOutputNode(pExtConn->m_pOutputPagedBuf) == false{pExtConn=%p,pExtConn->m_pOutputPagedBuf=%p}",pExtConn,pExtConn->m_pOutputPagedBuf);
}
else if(pExtConn->m_pOutputPagedBuf != NULL )
{
pExtConn->m_intOutputFinished = 0;
pExtConn->m_intOutputPackSize = pExtConn->m_pOutputPagedBuf->GetContentLength();
}
}
if( pExtConn->m_pOutputPagedBuf == NULL )
{
return 0;
}
const int _const_page_size = pExtConn->m_pOutputPagedBuf->GetPageSize();
const int nSocket = pExtConn->m_intSocket;
int & intPos = pExtConn->m_intOutputFinished;
const int intPackSize = pExtConn->m_intOutputPackSize;
const char *const*pPages = &*pExtConn->m_pOutputPagedBuf->GetPageBasePtrs();
if( (intPos == 0) && (intPackSize > 0) )
{
if( (m_intLogFlag & Log_ResponseInfo) == Log_ResponseInfo )
Log(CLog::Info,__func__,"Send packet begin!{intPacketSize=%d,id='%s'}",intPackSize,pExtConn->m_node.m_conn.m_strID.c_str());
if( (m_intLogFlag & Log_ResponsePack) == Log_ResponsePack )
DumpBuf(CLog::Info,__func__,"Packet Content",pPages[0],intPackSize,min(m_intLogMaxContentLength,_const_page_size));
}
int intPageBeg = intPos/_const_page_size;
int intPageEnd = (intPackSize+_const_page_size-1)/_const_page_size-1;
int intSendSize = 0;
for( int i = intPageBeg ; i <= intPageEnd ; i++ )
{
const char * pBuf = pPages[i]+intPos%_const_page_size;
int intNeedWrite = ((i == intPageEnd) ? (intPackSize-intPos):(_const_page_size-(intPos%_const_page_size)));
int nCount = intNeedWrite;
int intErrorCode = 0;
if( CComApi::Send(nSocket,pBuf,nCount,MSG_NOSIGNAL,&intErrorCode) == false )
{
Log(CLog::Error,__func__,"Send(nSocket,pBuf,nCount,MSG_NOSIGNAL,&intErrorCode) == false{nSocket=%d,pBuf=%08p,nCount=%d,intErrorCode=%d,id='%s'}",nSocket,pBuf,nCount,intErrorCode,pExtConn->m_node.m_conn.m_strID.c_str());
return -1;
}
//Log(CLog::Warn,__func__,"Send data size = %d", nCount);
if(nCount != intNeedWrite)
{
intSendSize = -2;
//Log(CLog::Warn,__func__,"Not all data wirte");
}
else
{
intSendSize += nCount;
}
intPos += nCount;
break;
}
if( intPos == intPackSize )
{
if( pExtConn->m_node.m_conn.FreeOutputNode(pExtConn->m_pOutputPagedBuf) == false )
{
Log(CLog::Error,__func__,"pExtConn->m_node.m_conn.FreeOutputNode(pExtConn->m_pOutputPagedBuf) == false{pExtConn=%p,pExtConn->m_pOutputPagedBuf=%p,id='%s'}",pExtConn,pExtConn->m_pOutputPagedBuf,pExtConn->m_node.m_conn.m_strID.c_str());
return -1;
}
pExtConn->m_pOutputPagedBuf = NULL;
pExtConn->m_intOutputPackSize = 0;
pExtConn->m_intOutputFinished = 0;
}
return intSendSize;
}
int CConMag::GetFreeConCount()
{
CAutoLock guard(m_lockFree);
return m_intFreeCount;
}
bool CConMag::AddConn(const unsigned long long int intSerialNo
,const int intSocket
,const string strID
,const string strPublicKey
)
{
_ConnExt *pExtConn = NULL;
{
CAutoLock guard(m_lockFree);
if(m_intFreeCount)
{
pExtConn = new _ConnExt(m_pageset,intSerialNo,strID,strPublicKey);
}
else
{
Log(CLog::Warn,__func__,"Free is empty!{m_intFreeCount=%d,strID='%s'}",m_intFreeCount,strID.c_str());
}
}
if( pExtConn == NULL )
{
CAutoLock guard(m_lockNew);
int intNewSize = m_vectNew.size();
Log(CLog::Error,__func__,"No free connection left,add oper failed,will return false!{pExtConn=%p,intNewSize=%d,strID='%s'}",pExtConn,intNewSize,strID.c_str());
Log(CLog::Info,__func__,"connection info!{intSocket=%d,strID='%s'}",intSocket,strID.c_str());
return false;
}
pExtConn->m_node.m_conn.m_blnDoStatic = m_blnDoStatic;
pExtConn->m_intSocket = intSocket;
pExtConn->m_intInputFinished = 0;
pExtConn->m_intInputPackSize = 0;
pExtConn->m_pOutputPagedBuf = NULL;
pExtConn->m_intOutputFinished = 0;
pExtConn->m_intOutputPackSize = 0;
{
CAutoLock guard(m_lockNew);
m_vectNew.push_back(pExtConn);
}
Log(CLog::Info,__func__,"Add connection succeed!!{intSocket=%d,strID='%s'}",intSocket,strID.c_str());
return true;
}
void CConMag::Execute(void *)
{
const int _const_shakehand_interval = m_udtTransLayerParam.m_intShakeHandInterval;
const int _const_timeout = m_intPollTimeout;
const int _const_max_waiting_task = m_intMaxWaitingTask;
const short shtReadEvents = POLLIN;
const short shtWriteEvents = POLLOUT;
const short shtErrorEvents = POLLERR | POLLHUP | POLLNVAL;
time_t tmLastLogActive = 0;
const int _const_timeout_log = 60;
time_t tmLastFreeMorePages = 0;
set<_CloseInfo> setNeedClose;
vector<char > vectShakeHandQueryPack;
time_t tmLastShakeHand = 0;
time_t tmLastChangeKey = 0;
map<unsigned long long int,_ConnExt*>::iterator it;
for(;GetStopFlag() == false;)
{
vector<pollfd> vectFds;
pollfd * pFds = NULL;
{
CAutoLock lock(m_lockAll);
vectFds.resize(m_mapAll.size());
if(vectFds.size())
{
pFds = &*vectFds.begin();
}
{
CAutoLock lock(m_lockFree);
m_intFreeCount = m_intMaxConnection - m_mapAll.size();
if(m_intFreeCount < 0)
{
m_intFreeCount = 0;
}
}
{
CAutoLock lock(m_lockPacket);
for(deque<_AddPacket>::iterator it = m_listAddPacket.begin(); it != m_listAddPacket.end(); it++)
{
map<unsigned long long int,_ConnExt*>::iterator itFind = m_mapAll.find(it->m_intSerialNo);
if(itFind != m_mapAll.end())
{
itFind->second->m_node.m_conn.AddOutputPack(it->m_packet,it->m_pri);//ÏȽ«·¢ËÍ°ü¼Óµ½»º´æ¶ÓÁÐ
}
else
{
delete it->m_packet;
}
}
m_listAddPacket.clear();
}
int index = 0;
for( index=0,it=m_mapAll.begin(); index < (int)m_mapAll.size(); ++it,index++)
{
_ConnExt* pExtConn = it->second;
pFds[index].fd = pExtConn->m_intSocket;
pFds[index].revents = 0;
if( pExtConn->m_node.m_blnPaused == true )
{
const int intPages = pExtConn->m_node.m_conn.GetOutputQueuePages();
if( intPages <= (m_intMaxOutQueuePages/2) ) //»Ö¸´
{
ResumeProcessRequest(pExtConn->m_node.m_conn.m_intSerialNo);
pExtConn->m_node.m_blnPaused = false;
Log(CLog::Warn,__func__,"Á¬½ÓµÄÊä³ö¶ÓÁÐÒѲ»ÔÙ´¦ÓÚÂúµÄ״̬£¨<='m_intMaxOutQueuePages/2'£©£¬¶Ôµ±Ç°Á¬½ÓÇëÇóµÄ´¦ÀíÒѾ»Ö¸´£¡{pExtConn->m_node.m_blnPaused=%d,intPages=%d,m_intMaxOutQueuePages=%d,id='%s'}"
,pExtConn->m_node.m_blnPaused,intPages,m_intMaxOutQueuePages,pExtConn->m_node.m_conn.m_strID.c_str());
}
}
else
{
const int intPages = pExtConn->m_node.m_conn.GetOutputQueuePages();
if(intPages >= m_intMaxOutQueuePages)
{
PauseProcessRequest(pExtConn->m_node.m_conn.m_intSerialNo);//ÔÝÍ£
pExtConn->m_node.m_blnPaused = true;
pExtConn->m_node.m_pauseStartTime = CComDateTime::GetCurrentTime();
Log(CLog::Warn,__func__,"Á¬½ÓµÄÊä³ö¶ÓÁÐÒÑÂú£¨>='m_intMaxOutQueuePages'£©£¬¶Ôµ±Ç°Á¬½ÓÇëÇóµÄ´¦ÀíÒѾÔÝÍ££¡{pNode->m_blnPaused=%d,intPages=%d,m_intMaxOutQueuePages=%d,id='%s'}"
,pExtConn->m_node.m_blnPaused,intPages,m_intMaxOutQueuePages,pExtConn->m_node.m_conn.m_strID.c_str());
}
}
if( pExtConn->m_pOutputPagedBuf == NULL )
{
if( pExtConn->m_node.m_conn.GetOutputNode(pExtConn->m_pOutputPagedBuf) == false )//´Óm_conn¶ÓÁÐÀïÈ¡³öÀ´
{
Log(CLog::Error,__func__,"pExtConn->m_node.m_conn.GetOutputNode(pExtConn->m_pOutputPagedBuf) == false{pExtConn=%p,pExtConn->m_pOutputPagedBuf=%p}",pExtConn,pExtConn->m_pOutputPagedBuf);
}
else if(pExtConn->m_pOutputPagedBuf != NULL )
{
pExtConn->m_intOutputFinished = 0;
pExtConn->m_intOutputPackSize = pExtConn->m_pOutputPagedBuf->GetContentLength();
}
}
int intTaskCount = m_tqTasks.GetGroupTaskCount(pExtConn->m_node.m_conn.m_intSerialNo);
bool blnNeedRead = (intTaskCount < _const_max_waiting_task) ;
bool blnNeedWrite = pExtConn->m_pOutputPagedBuf != NULL;
pFds[index].events = ((blnNeedRead?shtReadEvents:0) + (blnNeedWrite?shtWriteEvents:0));
if(blnNeedRead == false)
{
if( (m_intLogFlag & Log_RequestInfo) == Log_RequestInfo )
Log(CLog::Info,__func__,"blnNeedRead == false[(intTaskCount < _const_max_waiting_task) == false]{intTaskCount=%d , _const_max_waiting_task=%d,id='%s'}",intTaskCount , _const_max_waiting_task,pExtConn->m_node.m_conn.m_strID.c_str());
}
}
}
int nCount = 0;
if( CComApi::Poll(nCount,pFds,vectFds.size(),_const_timeout) == false )
{
Log(CLog::Error,__func__,"Poll(nCount,pFds,vectFds.size(),_const_timeout) == false{nCount=%d,pFds=%p,vectFds.size()=%d,_const_timeout=%d}",nCount,pFds,vectFds.size(),_const_timeout);
break;
}
if( true )
{
time_t tmNow = time(NULL);
if( (tmNow - tmLastLogActive) >= _const_timeout_log)
{
int intTotalThreadNum = 0;
int intSleepThreadNum = 0;
bool blnStop = 0;
long long int next_task_no = 0;
int intTasksCount = 0;
int intGroupsCount = 0;
m_tqTasks.GetInfo(intTotalThreadNum,intSleepThreadNum,blnStop,next_task_no,intTasksCount,intGroupsCount);
Log(CLog::Status,__func__,"I am active[CConMag]{task_count=%d, group_count=%d, undelete_page_count=%d, tmNow=%d,tmLastLogActive=%d,_const_timeout_log=%d,this=%08p,vectFds.size()=%d}"
, intTasksCount, intGroupsCount, CPageSet::GetUnDeletePageCount(), tmNow,tmLastLogActive,_const_timeout_log,this,vectFds.size());
tmLastLogActive = tmNow;
}
}
{
CAutoLock lock(m_lockAll);
int index = 0;
for( index=0,it=m_mapAll.begin(); (index < (int)m_mapAll.size())&&(nCount>0); ++it,index++)
{
_ConnExt* pExtConn = it->second;
Con_Status enuCon_Status = Con_NoError;
bool blnCounted = false;
if( (pFds[index].revents & shtErrorEvents) != 0 )// ´íÎ󣬰ѼÓÈëµ½¹Ø±Õ¶ÓÁÐ
{
enuCon_Status = Con_NetError;
blnCounted = true;
Log(CLog::Error,__func__,"Error event is set,will be closed[(pFds[index].revents & shtErrorEvents) != 0]!{index=%d,pFds[index].fd=%d,pFds[index].revents=0x%x,shtErrorEvents=0x%x,POLLERR=%d,POLLHUP=%d,POLLNVAL=%d}",index,pFds[index].fd,pFds[index].revents,shtErrorEvents,POLLERR,POLLHUP,POLLNVAL);
_CloseInfo info;
info.m_intSerialNo = pExtConn->m_node.m_conn.m_intSerialNo;
info.m_enuCloseReason = enuCon_Status;
setNeedClose.insert(info);
}
else
{
if( (pFds[index].revents & shtReadEvents) != 0 )//ÓпɶÁÊý¾Ý
{
blnCounted = true;
enuCon_Status = DoRecv(pExtConn);
if( enuCon_Status != Con_NoError )
{
if( enuCon_Status == Con_ClientClosed )
{
Log(CLog::Info,__func__,"Socket is closed by client!{enuCon_Status=%d,pExtConn=%d,id='%s'}",enuCon_Status,pExtConn,pExtConn->m_node.m_conn.m_strID.c_str());
}
else
{
Log(CLog::Error,__func__,"DoRecv(pExtConn) != Con_NoError {enuCon_Status=%d,pExtConn=%d,id='%s'}",enuCon_Status,pExtConn,pExtConn->m_node.m_conn.m_strID.c_str());
}
_CloseInfo info;
info.m_intSerialNo = pExtConn->m_node.m_conn.m_intSerialNo;
info.m_enuCloseReason = enuCon_Status;
setNeedClose.insert(info);
}
}
if( (pFds[index].revents & shtWriteEvents) != 0 )//¿Éд
{
blnCounted = true;
enuCon_Status = DoSend(pExtConn);
if( enuCon_Status != Con_NoError )
{
Log(CLog::Error,__func__,"DoSend(pExtConn) != Con_NoError{enuCon_Status=%d,pExtConn=%d,id='%s'}",enuCon_Status,pExtConn,pExtConn->m_node.m_conn.m_strID.c_str());
_CloseInfo info;
info.m_intSerialNo = pExtConn->m_node.m_conn.m_intSerialNo;
info.m_enuCloseReason = enuCon_Status;
setNeedClose.insert(info);
}
}
}
if( blnCounted == true )
{
nCount--;
}
}
}
if(m_udtTransLayerParam.m_intShakeHandInterval > 0 && m_udtTransLayerParam.m_intShakeHandInterval != 0x7fffffff)
{
CComDateTime cur = CComDateTime::GetCurrentTime();
CAutoLock lock(m_lockAll);
for( it=m_mapAll.begin(); it != m_mapAll.end(); ++it )
{
CComDateTimeSpan span(0,0,0,m_udtTransLayerParam.m_intShakeHandInterval);
if(cur - it->second->m_lastRecvTime > span)//δÊÕµ½ÐÄÌø°ü
{
Log(CLog::Warn,__func__,"Shake hand time out{time_out=%d, serialno=%d, fd=%d, key=%s}",
m_udtTransLayerParam.m_intShakeHandInterval, it->first, it->second->m_intSocket, it->second->m_node.m_strPublicKey.c_str());
_CloseInfo info;
info.m_intSerialNo = it->first;
info.m_enuCloseReason = Con_ShakeHandTimeout;
setNeedClose.insert(info);
}
}
}
{
CComDateTime cur = CComDateTime::GetCurrentTime();
CAutoLock lock(m_lockAll);
for( it=m_mapAll.begin(); it != m_mapAll.end(); ++it )
{
CComDateTimeSpan span(0,0,0,m_intPauseTimeout);
if(it->second->m_node.m_blnPaused && cur - it->second->m_node.m_pauseStartTime > span)//ÔÝÍ£³¬Ê±, »º´æ¶ÓÁÐÂúÁËÔÝÍ£
{
Log(CLog::Warn,__func__,"Pause time out{time_out=%d, serialno=%d, fd=%d, key=%s}",
m_intPauseTimeout, it->first, it->second->m_intSocket, it->second->m_node.m_strPublicKey.c_str());
_CloseInfo info;
info.m_intSerialNo = it->first;
info.m_enuCloseReason = Con_OutputQueueFull;
setNeedClose.insert(info);
}
}
}
time_t tmNow = ::time(NULL);
if( true )
{
CAutoLock guard(m_lockClosing);
for( closing_type::iterator it = m_mapClosing.begin(); it != m_mapClosing.end() ; )
{
closing_type::iterator it_erase = m_mapClosing.end();
if( it->first <= tmNow )
{
setNeedClose.insert(it->second);
it_erase = it;
}
else
{
if( setNeedClose.find(it->second) != setNeedClose.end() )
{
it_erase = it;
}
}
++it;
if( it_erase != m_mapClosing.end() )
{
m_mapClosing.erase(it_erase);
}
}
}
vector<pair<unsigned long long int,Con_Status> > vectClosedNotifys;
if(true)
{
CAutoLock lock(m_lockAll);
for( set<_CloseInfo>::iterator it = setNeedClose.begin(); it != setNeedClose.end() ; ++it )//ÓÖ´ÓsetÀïÒƵ½vectorÀ´Ómapɾµô
{
const _CloseInfo & close_info = *it;
map<unsigned long long int,_ConnExt*>::iterator it_find = m_mapAll.find(close_info.m_intSerialNo);
if( it_find == m_mapAll.end() )
{
Log(CLog::Error,__func__,"closing connection's serial_no not exist in 'm_mapAll'[it_find == m_mapAll.end()]{close_info=(%lld,%d)}",close_info.m_intSerialNo,close_info.m_enuCloseReason);
continue;
}
m_tqTasks.CancelGroup(close_info.m_intSerialNo);
_ConnExt* pExtConn = it_find->second;
vectClosedNotifys.push_back(pair<unsigned long long int,Con_Status>(close_info.m_intSerialNo,close_info.m_enuCloseReason));
if(pExtConn)
{
delete pExtConn;
}
m_mapAll.erase(it_find);
}
setNeedClose.clear();
}
vector<unsigned long long int> vectNewNotifys;
if( true )
{
CAutoLock lock(m_lockAll);
vector<_ConnExt*> vectNewTmp;
{
CAutoLock guard(m_lockNew);
vectNewTmp.swap(m_vectNew);//Ò»´ÎÄóö·Åµ½vectorÀï
}
vector<char> vectPacket;
for( vector<_ConnExt*>::iterator it = vectNewTmp.begin(); it != vectNewTmp.end() ; ++it )
{
vectNewNotifys.push_back((*it)->m_node.m_conn.m_intSerialNo);
m_mapAll[(*it)->m_node.m_conn.m_intSerialNo] = *it;
}
}
//¹Ø±Õsocket
for( vector<pair<unsigned long long int,Con_Status> >::iterator it = vectClosedNotifys.begin(); it != vectClosedNotifys.end() ; ++it )
{
ConnCloseNotify(it->first,it->second);
}
//֪ͨÐÂÁ¬½Ó
for( vector<unsigned long long int>::iterator it = vectNewNotifys.begin(); it != vectNewNotifys.end() ; ++it )
{
NewConnNotify(*it);
}
}
Log(CLog::Status,__func__,"Finished [GetStopFlag()==true]{GetStopFlag()=%d,this=%08p}",GetStopFlag(),this);
return;
}
bool CConMag::_SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
)
{
const char * pchNewPacket = pchPacket;
int intNewPacketSize = intPacketSize;
CAutoLock lock(m_lockPacket);
_AddPacket newPacket;
newPacket.m_intSerialNo = intSerialNo;
newPacket.m_pri = enuPriority;
//newPacket.m_packet.insert(newPacket.m_packet.end(), pchNewPacket, pchNewPacket + intNewPacketSize);
newPacket.m_packet = m_pageset.CreatePagedBuf();
newPacket.m_packet->AppendData(pchPacket, intPacketSize);
m_listAddPacket.push_back(newPacket);
if( (m_intLogFlag & Log_ResponseInfo) == Log_ResponseInfo )
Log(CLog::Info,__func__,"´¦ÀíÍê³É£¬´òÓ¡Ïà¹ØÐÅÏ¢!{pchNewPacket=%p,intNewPacketSize=%d,enuPriority=%d,intSerialNo=%lld}"
,pchNewPacket,intNewPacketSize,enuPriority,intSerialNo);
return true;
}
void CConMag::UndoTask(const unsigned long long int & intSerialNo
,const unsigned int & intReqPacketNo
,const _TaskInfo & info
)
{
return;
}
bool CConMag::DoTask(const unsigned long long int & intSerialNo
,const unsigned int & intReqPacketNo
,const _TaskInfo & info
)
{
int intPacketSize = info.m_packet.size();
const char * pPacketBuf = &*info.m_packet.begin();
bool blnFinished = NewPacketNotify(intSerialNo, info.m_strID, pPacketBuf, intPacketSize);
return blnFinished;
}
CConMag::CConMag()
: CBaseLog()
,m_intLogFlag(Log_RequestInfo | Log_RequestPack | Log_ResponseInfo | Log_ResponsePack)
,m_intLogMaxContentLength(256)
,m_blnDoStatic(false)
,m_udtTransLayerParam()
,m_intMaxConnection(0)
,m_intPollTimeout(1)
,m_intMaxWaitingTask(100)
,m_intMaxOutQueuePages(1000)
,m_intRecvBufInitSize(4*1024)
,m_intRecvBufMaxSize(4*1024*1024)
,m_tqTasks()
,m_intMinSendSize(32*1024)
,m_intPauseTimeout(300)
{
m_pHead = NULL;
}
CConMag::~CConMag()
{
for(map<unsigned long long int,_ConnExt*>::iterator it = m_mapAll.begin(); it != m_mapAll.end(); it++)
{
delete it->second;
}
{
CAutoLock lock(m_lockNew);
for(int i = 0; i < m_vectNew.size(); i++)
{
delete m_vectNew[i];
}
}
};
bool CConMag::StartThreads()
{
m_tqTasks.StartThreads();
CThread::Start(NULL,true);
return true;
};
bool CConMag::StopThreads()
{
CloseAllConns(Con_UnknownError);
CThread::Stop();
void * pRet = NULL;
CThread::Join(&pRet);
m_tqTasks.StopThreads();
return true;
};
void CConMag::SetLogConfig(const int intFlag
,const int intMaxContentLength
)
{
const_cast<int&>(m_intLogFlag) = intFlag;
const_cast<int&>(m_intLogMaxContentLength) = intMaxContentLength;
Log(CLog::Info,__func__,"Finish succ!{m_intLogFlag=%d,m_intLogMaxContentLength=%d,intFlag=%d,intMaxContentLength=%d}",m_intLogFlag,m_intLogMaxContentLength,intFlag,intMaxContentLength);
}
bool CConMag::SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
)
{
return _SendPacket(intSerialNo,pchPacket,intPacketSize,Priority_Normal);
}
bool CConMag::SendPriorityPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
)
{
return _SendPacket(intSerialNo,pchPacket,intPacketSize,enuPriority);
}
bool CConMag::AddClosingConn(const unsigned long long int intSerialNo
,const Con_Status enuCloseReason
,const int intDelay
)
{
time_t tmClose = ::time(NULL) + intDelay;
_CloseInfo info;
info.m_intSerialNo = intSerialNo;
info.m_enuCloseReason = enuCloseReason;
{
CAutoLock guard(m_lockClosing);
m_mapClosing.insert(closing_type::value_type(tmClose,info));
}
Log(CLog::Info,__func__,"Succeed Info!{intSerialNo=%lld,enuCloseReason=%d,intDelay=%d}"
,intSerialNo,enuCloseReason,intDelay
);
return true;
}
bool CConMag::CloseAllConns(const Con_Status enuCloseReason
,const int intDelay
)
{
vector<unsigned long long int> vectAllConns;
{
CAutoLock guardAll(m_lockAll);
vectAllConns.reserve(m_mapAll.size());
for(all_type::const_iterator it = m_mapAll.begin() ; it != m_mapAll.end() ; ++it )
{
vectAllConns.push_back(it->first);
}
}
time_t tmClose = ::time(NULL) + intDelay;
_CloseInfo info;
info.m_enuCloseReason = enuCloseReason;
{
CAutoLock guard(m_lockClosing);
for( vector<unsigned long long int>::const_iterator it = vectAllConns.begin() ; it != vectAllConns.end() ; ++it )
{
info.m_intSerialNo = *it;
m_mapClosing.insert(closing_type::value_type(tmClose,info));
}
}
Log(CLog::Info,__func__,"Succeed Info!{enuCloseReason=%d,intDelay=%d}"
,enuCloseReason,intDelay
);
return true;
}
bool CConMag::PauseProcessRequest(const unsigned long long int intSerialNo
)
{
return m_tqTasks.PauseGroup(intSerialNo);
}
bool CConMag::ResumeProcessRequest(const unsigned long long int intSerialNo
)
{
return m_tqTasks.ResumeGroup(intSerialNo);
}
void CConMag::StaticStart()
{
const_cast<bool&>(m_blnDoStatic) = true;
}
void CConMag::StaticStop ()
{
const_cast<bool&>(m_blnDoStatic) = false;
CAutoLock guard(m_lockAll);
for(all_type::iterator it = m_mapAll.begin() ; it != m_mapAll.end() ; ++it )
{
_ConnExt* pExtConn = it->second;
pExtConn->m_node.m_conn.m_blnDoStatic = false;
}
}
bool CConMag::CConTask::DoTask(const unsigned long long int & group
,const unsigned long long int & sub_id
,const CConMag::_TaskInfo & info
)
{
return info.m_pMag->DoTask(group,sub_id,info);
};
void CConMag::CConTask::UndoTask(const unsigned long long int & group
,const unsigned long long int & sub_id
,const CConMag::_TaskInfo & info
)
{
info.m_pMag->UndoTask(group,sub_id,info);
};
int CConMag::GetConnections() const
{
CAutoLock guard(m_lockAll);
return m_mapAll.size();
}
int CConMag::GetMaxConnections() const
{
m_intMaxConnection;
}
void CConMag::SetPackHead(CPackProtocol * pHead)
{
m_pHead = pHead;
}
/*
bool CConMag::Log(CLog::Level enuLevel, const char * pFunc, const char *pFormat,...)
{
va_list arglist;
va_start(arglist, pFormat);
if( CLog::s_pLog != NULL )
{
return CLog::s_pLog->Log(enuLevel,"CConMag_N", pFunc, pFormat,arglist);
}
return false;
}
*/
bool CConMagGroup::Init(const _Trans_Param & udtTransLayerParam
,CPackProtocol * pHead
,const unsigned char cGroupCount
,const int intMaxConnection
,CLog * pLog
,const int intTaskQueueThreadNum
,const int intMinSendSize
,const int intPageSize
,const int intRecvBufInitSize
,const int intRecvBufMaxSize
,const int intPollTimeout
,const int intMaxWaitingTask
,const int intMaxOutQueuePages
)
{
CBaseLog::Init(pLog);
for(int i = 0; i < cGroupCount; i++)
{
CConMagSub * pCon = new CConMagSub(this);
pCon->Init(udtTransLayerParam, pHead, pLog, intMaxConnection, intTaskQueueThreadNum, intMinSendSize,
intPageSize, intRecvBufInitSize, intRecvBufMaxSize, intPollTimeout, intMaxWaitingTask, intMaxOutQueuePages);
m_vectObjs.push_back(pCon);
}
return true;
}
bool CConMagGroup::SendPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
)
{
if(pchPacket == NULL || intPacketSize == 0)
{
Log(CLog::Error,__func__,"pchPacket == NULL || intPacketSize == 0");
return false;
}
unsigned char cGroup = GetGroupID(intSerialNo);
if(cGroup > m_vectObjs.size())
{
Log(CLog::Error,__func__,"cGroup > m_vectObjs.size(){cGroup=%d}", cGroup);
return false;
}
return m_vectObjs[cGroup]->SendPacket(intSerialNo, pchPacket, intPacketSize);
}
bool CConMagGroup::SendPriorityPacket(const unsigned long long int intSerialNo
,const char * pchPacket
,const int intPacketSize
,const Packet_Priority enuPriority
)
{
if(pchPacket == NULL || intPacketSize == 0)
{
Log(CLog::Error,__func__,"pchPacket == NULL || intPacketSize == 0");
return false;
}
unsigned char cGroup = GetGroupID(intSerialNo);
if(cGroup > m_vectObjs.size())
{
Log(CLog::Error,__func__,"cGroup > m_vectObjs.size(){cGroup=%d}", cGroup);
return false;
}
return m_vectObjs[cGroup]->SendPriorityPacket(intSerialNo, pchPacket, intPacketSize, enuPriority);
}
bool CConMagGroup::AddConn(unsigned long long int & intSerialNo, const int intSocket,const string strID,const string strPublicKey)
{
unsigned long long int lltOrginSerialNo = GetNewSerialNo();
unsigned char cGroupID = 0;
int intMaxFreeCons = 0;
for(int i = 0; i < m_vectObjs.size(); i++)
{
int intTmp = m_vectObjs[i]->GetFreeConCount();
if(intTmp > intMaxFreeCons)
{
cGroupID = i;
intMaxFreeCons = intTmp;
}
}
if(0 == intMaxFreeCons)
{
Log(CLog::Error,__func__,"No free connection{cGroup=%d, m_vectObjs.size()=%d, intMaxFreeCons=%d}", cGroupID, m_vectObjs.size(), intMaxFreeCons);
CComApi::Close(intSocket);
return false;
}
intSerialNo = CreateGroupSerialNo(lltOrginSerialNo, cGroupID);
bool blnResult = m_vectObjs[cGroupID]->AddConn(intSerialNo, intSocket, strID, strPublicKey);
if(!blnResult)
{
Log(CLog::Error,__func__,"Add connection failed{cGroup=%d, m_vectObjs.size()=%d, intMaxFreeCons=%d}", cGroupID, m_vectObjs.size(), intMaxFreeCons);
CComApi::Close(intSocket);
}
return blnResult;
}
int CConMagGroup::GetActiveConCount()
{
int intResult = 0;
for(int i = 0; i < m_vectObjs.size(); i++)
{
intResult += (m_vectObjs[i]->m_intMaxConnection - m_vectObjs[i]->GetFreeConCount());
}
return intResult;
}
bool CConMagGroup::AddClosingConn(const unsigned long long int intSerialNo
,const Con_Status enuCloseReason
,const int intDelay
)
{
unsigned char cGroup = GetGroupID(intSerialNo);
if(cGroup > m_vectObjs.size())
{
Log(CLog::Error,__func__,"AddClosingConn error{cGroup=%d, intSerialNo=%d, m_vectObjs.size()=%d}", cGroup, intSerialNo, m_vectObjs.size());
return false;
}
return m_vectObjs[cGroup]->AddClosingConn(intSerialNo, enuCloseReason, intDelay);
}
bool CConMagGroup::CloseAllConns(const Con_Status enuCloseReason
,const int intDelay
)
{
for(int i = 0; i < m_vectObjs.size(); i++)
{
m_vectObjs[i]->CloseAllConns(enuCloseReason, intDelay);
}
return true;
}
void CConMagGroup::SetLogConfig(const int intFlag
,const int intMaxContentLength
)
{
for(int i = 0; i < m_vectObjs.size(); i++)
{
m_vectObjs[i]->SetLogConfig(intFlag, intMaxContentLength);
}
}
bool CConMagGroup::StartThreads()
{
for(int i = 0; i < m_vectObjs.size(); i++)
{
m_vectObjs[i]->StartThreads();
}
return true;
}
bool CConMagGroup::StopThreads()
{
for(int i = 0; i < m_vectObjs.size(); i++)
{
m_vectObjs[i]->StopThreads();
}
return true;
}
void CConMagGroup::SetPackHead(CPackProtocol * pHead)
{
for(int i = 0; i < m_vectObjs.size(); i++)
{
m_vectObjs[i]->SetPackHead(pHead);
}
}
unsigned long long int CConMagGroup::GetNewSerialNo()
{
const unsigned long long int _const_max_id = 0x00FFFFFFFFFFFFFF;
CAutoLock lock(m_lock);
if(m_intSerialNo < _const_max_id)
{
m_intSerialNo++;
}
else
{
m_intSerialNo = 0;
}
return m_intSerialNo;
}
unsigned char CConMagGroup::GetGroupID(const unsigned long long int intSerialNo)
{
unsigned char cGroupID = intSerialNo >> 56;
return cGroupID;
}
unsigned long long int CConMagGroup::CreateGroupSerialNo(const unsigned long long int intSerialNo, const unsigned char cGroupID)
{
unsigned long long int ret = cGroupID;
ret = ret << 56;
ret |= intSerialNo;
return ret;
}
void CConMagGroup::NewConnNotify(const unsigned long long int intSerialNo
)
{
}
void CConMagGroup::RequestCanceledNotify(const unsigned long long int intSerialNo
,const int intReqPacketNo
)
{
}
/*
bool CConMagGroup::Log(CLog::Level enuLevel, const char * pFunc, const char *pFormat,...)
{
va_list arglist;
va_start(arglist, pFormat);
if( CLog::s_pLog != NULL )
{
return CLog::s_pLog->Log(enuLevel,"CConMagGroup", pFunc, pFormat,arglist);
}
return false;
}
*/
#ifndef CCONN_H
#define CCONN_H
#include <string>
#include <queue>
#include <vector>
using namespace std;
#include "pageset.h"
#include "DateTime.h"
using namespace BackCom;
namespace NetIO
{
class CConnection
{
public:
typedef CPageSet::_Page _Page;
struct _Node
{
int m_intPriority; // °üµÄÓÅÏȼ¶£¨ÊýÖµ´óµÄÓÅÏȼ¶¸ß£©
int m_intNo; // °üµÄÌí¼ÓÐòºÅ£¨ÊýֵСµÄÓÅÏȼ¶¸ß£©
//
_Page *m_pPagedBuf;
};
struct _StaticInfo
{
SDateTime m_dtmTime;
int m_intMicroseconds;
bool m_blnIsRecv;
char m_pchHead[26];
};
private:
struct comp
{
bool operator()(const _Node x,const _Node y) const
{
return ( x.m_intPriority == y.m_intPriority )?(x.m_intNo > y.m_intNo):(x.m_intPriority < y.m_intPriority);
};
};
//
typedef priority_queue<_Node,vector<_Node>,comp> output_type;
public:
CConnection(CPageSet & poolOutput
,const unsigned long long int intSerialNo // [in ]Ö¸¶¨ËùÌí¼ÓÁ¬½ÓµÄÐòÁкÅ
,const string & strID // [in ]Ö¸¶¨ËùÌí¼ÓÁ¬½ÓµÄÁ¬½Ó±êʶ£¬ÔÚ´òÓ¡ÐÅϢʱ»á´òÓ¡³öÀ´£¬ÓÃÓÚÖ¸Ã÷Ïà¹ØµÄÁ¬½Ó
);
~CConnection();
// ¹¦ ÄÜ£ºÌí¼Óаüµ½Êä³ö¶ÓÁÐÖÐ
bool AddOutputPack(const char * pchPacket // [in ]°üÖ¸Õë
,const int intPacketSize // [in ]°ü³¤¶È
,const int intPriority // [in ]°üÓÅÏȼ¶
);
bool AddOutputPack(_Page * pPage
,const int intPriority // [in ]°üÓÅÏȼ¶
);
// ¹¦ ÄÜ£ºÌí¼Óаüµ½Êä³ö¶ÓÁÐÖÐ
bool AddOutputPack(const char * pchHead // [in ]°üÖ¸Õë
,const int intHeadSize // [in ]°ü³¤¶È
,const char * pchBody // [in ]°üÖ¸Õë
,const int intBodySize // [in ]°ü³¤¶È
,const int intPriority // [in ]°üÓÅÏȼ¶
);
// ¹¦ ÄÜ£ºÈ¡µÃÊä³ö¶ÓÁÐÖÐÏÂÒ»¸ö°üµÄ½Úµã¶ÔÏóÖ¸Õ루³É¹¦Ê±½«¸Ã°ü´Ó¶ÓÁÐÖÐɾ³ý£¬½ÚµãÕ¼ÓõÄ×ÊÔ´½«²»»á±»ÊÍ·Å£©
bool GetOutputNode(_Page *& pPagedBuf // [out]·µ»ØÏÂÒ»¸ö°üµÄ½Úµã¶ÔÏóÖ¸Õë
);
// ¹¦ ÄÜ£ºÊÍ·ÅÊä³ö°ü½Úµã¶ÔÏóÖ¸Õ룬ͬʱÐÞÕýÊä³ö¶ÓÁÐʹÓõÄÒ³Êý
bool FreeOutputNode(_Page * pPagedBuf
);
int GetOutputQueuePages() const{ return m_intQueuePagesOutput; };
bool Desc(string & strDesc)const;
void DumpConnInfo(string & strResult
)const;
void ReleasePage();
private:
// ¹¦ ÄÜ£ºÌí¼Óаüµ½Êä³ö¶ÓÁÐÖÐ
bool _AddOutputPack(_Page* pBuf // [in ]
,const int intPriority // [in ]°üÓÅÏȼ¶
);
public:
const unsigned long long int m_intSerialNo; // Á¬½ÓµÄÐòÁкÅ
const string m_strID; // Á¬½ÓµÄ±êʶ´®
//
bool m_blnDoStatic; //
CLock m_lockStaticInfo; // Ëø¶ÔÏ󣬱£»¤¶Ô³ÉÔ±m_vectStaticInfoµÄ·ÃÎÊ
vector<_StaticInfo> m_vectStaticInfo; // I/O Static infos
private:
CPageSet & m_poolOutput; // ÓÃÓÚ»ñµÃ¡°´æ·ÅÊä³öÊý¾Ý°üʹÓõÄҳʽ»º³åÇø¶ÔÏó¡±µÄҳʽÄÚ´æ³Ø¶ÔÏóÖ¸Õë
//
CLock m_lock; // Ëø¶ÔÏ󣬱£»¤¶Ô³ÉÔ±µÄ·ÃÎÊ
//
output_type m_output; // Êä³ö°ü¶ÓÁУ¬
int m_intQueuePagesOutput; // Êä³ö°ü¶ÓÁÐÒѾʹÓõÄÒ³Êý
//vector<_Page*> m_vectFreeNodeOutput; // ¿ÕÏÐÊä³ö½Úµã¶ÓÁÐ
//
//int m_intTaskCount; // ÊôÓÚ±¾Á¬½ÓÒѾÌá½»µ½ÈÎÎñ¶ÓÁе«ÉÐδ´¦ÀíÍê³ÉµÄÈÎÎñÊý
//
int m_intNextNo; // ÏÂÒ»¸ö°üÌí¼ÓÐòºÅ
};
}; // BackCom
#endif
#include "connection.h"
#include "transctrllayer.h"
#include "pack.h"
namespace NetIO
{
// ¹¦ ÄÜ£ºÈ¡µÃÊä³ö¶ÓÁÐÖÐÏÂÒ»¸ö°üµÄ½Úµã¶ÔÏóÖ¸Õ루³É¹¦Ê±½«¸Ã°ü´Ó¶ÓÁÐÖÐɾ³ý£¬½ÚµãÕ¼ÓõÄ×ÊÔ´½«²»»á±»ÊÍ·Å£©
bool CConnection::GetOutputNode(_Page *& pPagedBuf // [out]·µ»ØÏÂÒ»¸ö°üµÄ½Úµã¶ÔÏóÖ¸Õë
)
{
pPagedBuf = NULL;
//
CAutoLock guard(m_lock);
if( m_output.empty() == false )
{
const _Node & node = m_output.top();
pPagedBuf = node.m_pPagedBuf;
m_output.pop();
}
return true;
}
// ¹¦ ÄÜ£ºÊÍ·ÅÊä³ö°ü½Úµã¶ÔÏóÖ¸Õ룬ͬʱÐÞÕýÊä³ö¶ÓÁÐʹÓõÄÒ³Êý
bool CConnection::FreeOutputNode(_Page * pPagedBuf
)
{
// ÓÉÓÚÄÚÈݲ»ÔÙÐèÒªÎÒÃǽ«ÆäÇå¿Õ£¬ÕâÑùÎÒÃÇ¿ÉÒÔͨ¹ýµ÷Óà FreeMorePages º¯ÊýÊͷŶàÓàµÄÄÚ´æÒ³
pPagedBuf->ClearData();
CAutoLock guard(m_lock);
m_intQueuePagesOutput -= pPagedBuf->GetPageCount();
delete pPagedBuf;
return true;
}
// ¹¦ ÄÜ£ºÌí¼Óаüµ½Êä³ö¶ÓÁÐÖÐ
bool CConnection::_AddOutputPack(_Page* pBuf // [in ]
,const int intPriority // [in ]°üÓÅÏȼ¶
)
{
_Node node;
node.m_pPagedBuf = pBuf;
node.m_intPriority = intPriority;
node.m_intNo = m_intNextNo++;
m_output.push(node);
m_intQueuePagesOutput += pBuf->GetPageCount();
//
if( m_blnDoStatic == true )
{
CAutoLock guard(m_lockStaticInfo);
m_vectStaticInfo.push_back(_StaticInfo());
_StaticInfo & info = m_vectStaticInfo.back();
//
info.m_dtmTime = CDateTime::DateTimeCnvt(CComDateTime::GetCurrentTime());
timeval sNow;
gettimeofday(&sNow,NULL);
info.m_intMicroseconds = sNow.tv_usec;
info.m_blnIsRecv = false;
const char *const*pPages = &*pBuf->GetPageBasePtrs();
memcpy(info.m_pchHead,pPages[0],min((int)sizeof(info.m_pchHead),pBuf->GetContentLength()));
}
return true;
}
bool CConnection::AddOutputPack(_Page * pPage
,const int intPriority // [in ]°üÓÅÏȼ¶
)
{
CAutoLock guard(m_lock);
return _AddOutputPack(pPage,intPriority);
}
// ¹¦ ÄÜ£ºÌí¼Óаüµ½Êä³ö¶ÓÁÐÖÐ
bool CConnection::AddOutputPack(const char * pchPacket // [in ]°üÖ¸Õë
,const int intPacketSize // [in ]°ü³¤¶È
,const int intPriority // [in ]°üÓÅÏȼ¶
)
{
CAutoLock guard(m_lock);
_Page* pBuf = m_poolOutput.CreatePagedBuf();
pBuf->ClearData();
pBuf->AppendData(pchPacket,intPacketSize);
//
return _AddOutputPack(pBuf,intPriority);
}
// ¹¦ ÄÜ£ºÌí¼Óаüµ½Êä³ö¶ÓÁÐÖÐ
bool CConnection::AddOutputPack(const char * pchHead // [in ]°üÖ¸Õë
,const int intHeadSize // [in ]°ü³¤¶È
,const char * pchBody // [in ]°üÖ¸Õë
,const int intBodySize // [in ]°ü³¤¶È
,const int intPriority // [in ]°üÓÅÏȼ¶
)
{
CAutoLock guard(m_lock);
_Page* pBuf = m_poolOutput.CreatePagedBuf();
pBuf->ClearData();
pBuf->AppendData(pchHead,intHeadSize);
pBuf->AppendData(pchBody,intBodySize);
//
return _AddOutputPack(pBuf,intPriority);
}
void CConnection::DumpConnInfo(string & strResult
)const
{
strResult = "";
char pBuf[4096];
//
strResult.reserve(1024*64);
//
sprintf(pBuf,"--------------------------------------------------------\n");
strResult += pBuf;
sprintf(pBuf, "%4s|%8s|%13s|%4s|%6s|%5s|%5s|%10s|%10s\n", "No", "Date","Time(us)","Dir","PackNo", "Type", "Ver","Size","CompSize");
strResult += pBuf;
sprintf(pBuf,"--------------------------------------------------------\n");
strResult += pBuf;
int i = 0;
CAutoLock guard(m_lockStaticInfo);
for(vector<_StaticInfo>::const_iterator it = m_vectStaticInfo.begin() ; it != m_vectStaticInfo.end() ; ++it )
{
const _StaticInfo & info = *it;
int intPacketNo = 0;
int intPacketSize = 0;
unsigned short shtType = 0;
unsigned short shtVersion = 0;
int intCompressedSize = 0;
//
const char * pchPacketHead = info.m_pchHead;
CPack::Input(pchPacketHead+10,shtType);
if(shtType == CTransCtrlLayer::Packet_CompressedEncrypted )
{// CompressedEncrypted
CPack::Input(pchPacketHead+0,intCompressedSize);
pchPacketHead += 12;
}
CPack::Input(pchPacketHead+ 0,intPacketSize);
CPack::Input(pchPacketHead+ 4,intPacketNo);
CPack::Input(pchPacketHead+ 8,shtVersion);
CPack::Input(pchPacketHead+10,shtType);
//
sprintf(pBuf, "%4d|%8d|%6d.%06d|%4s|%6d|%5x|%5x|%10d|%10d\n",i++, info.m_dtmTime.m_intDate,info.m_dtmTime.m_intTime,info.m_intMicroseconds,info.m_blnIsRecv?"Recv":"Send",intPacketNo, shtType,shtVersion,intPacketSize,intCompressedSize);
strResult += pBuf;
}
}
void CConnection::ReleasePage()
{
CAutoLock guard(m_lock);
while( m_output.empty() == false )
{
const _Node & node = m_output.top();
delete node.m_pPagedBuf;
m_output.pop();
}
}
bool CConnection::Desc(string & strDesc)const
{
return true;
}
CConnection::CConnection(CPageSet & poolOutput
,const unsigned long long int intSerialNo // [in ]Ö¸¶¨ËùÌí¼ÓÁ¬½ÓµÄÐòÁкÅ
,const string & strID // [in ]Ö¸¶¨ËùÌí¼ÓÁ¬½ÓµÄÁ¬½Ó±êʶ£¬ÔÚ´òÓ¡ÐÅϢʱ»á´òÓ¡³öÀ´£¬ÓÃÓÚÖ¸Ã÷Ïà¹ØµÄÁ¬½Ó
)
:m_intSerialNo(intSerialNo)
,m_strID(strID)
,m_blnDoStatic(false)
,m_poolOutput(poolOutput)
,m_lock()
,m_output()
,m_intQueuePagesOutput(0)
,m_intNextNo(1)
{
};
CConnection::~CConnection()
{
ReleasePage();
};
} //BackCom
#ifndef CPACK_H
#define CPACK_H
#ifdef LD_WINDOWS
#define INT64 _int64
#else
#define INT64 long long int
#define BYTE unsigned char
#endif
#if _MSC_VER > 1000
#pragma warning(disable: 4786)
#pragma warning(disable: 4503)
#include "..\common\staticstring.h"
using namespace BackCom;
#else
#include "staticstring.h"
#endif // _MSC_VER > 1000
#include <string>
#include <vector>
using namespace std;
namespace BackCom
{
struct SPackHead
{
int m_intPackSize;
int m_intPackNo;
unsigned short m_shtVer;
unsigned short m_shtType;
};
class CPack
{
public:
enum { _const_pack_head_size = 12 };
enum ErrorCode { Err_NoErr=0
,Err_OutOfMem
,Err_UnsupportVer
,Err_UnknownType
,Err_ContentErr
,Err_Internal
};
//
public:
static ErrorCode PackHead(char *pchPackHead
,int & nOutPackHeadSize
,const int intPackSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
);
static ErrorCode UnpackHead(int & intOutPackSize
,int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,const char *pchPack
,const int intPackSize
);
//
static ErrorCode PackHead(char *pchPackHead
,int & nOutPackHeadSize
,const SPackHead *pHead
);
static ErrorCode UnpackHead(SPackHead *pHead
,const char *pchPack
,const int intPackSize
);
//
static int SafeUnpackString(const char * pBuf, const int intLen, string & str);
static int UnpackStringList(const char * pBuf, const int intLen, vector<string> & data);
static bool UnpackHead(const char * pBuf, const int intLen, const unsigned short shtType, const unsigned short shtVer, int & intPackNo);
template<class _Tp>
static ErrorCode Pack1(char *pchBuf
,int & intSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
,const _Tp & value
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Output(pchBuf+ nTmp,value);
if( intSize < nTmp )
{
intSize = nTmp;
return Err_OutOfMem;
}
return PackHead(pchBuf,intSize,nTmp,intPackNo,shtVersion,shtType);
}
template<class _Tp>
static ErrorCode Unpack1(int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,_Tp & value
,const char *pchPack
,const int intPackSize
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Input(pchPack+ nTmp,value);
if( intPackSize < nTmp )
{
return Err_ContentErr;
}
return UnpackHead(nTmp,intPackNo,shtVersion,shtType,pchPack,intPackSize);
}
template<class _Tp1,class _Tp2>
static ErrorCode Pack2(char *pchBuf
,int & intSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
,const _Tp1 & value1
,const _Tp2 & value2
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Output(pchBuf+ nTmp,value1);
nTmp += Output(pchBuf+ nTmp,value2);
if( intSize < nTmp )
{
intSize = nTmp;
return Err_OutOfMem;
}
return PackHead(pchBuf,intSize,nTmp,intPackNo,shtVersion,shtType);
}
template<class _Tp1,class _Tp2>
static ErrorCode Unpack2(int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,_Tp1 & value1
,_Tp2 & value2
,const char *pchPack
,const int intPackSize
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Input(pchPack+ nTmp,value1);
nTmp += Input(pchPack+ nTmp,value2);
if( intPackSize < nTmp )
{
return Err_ContentErr;
}
return UnpackHead(nTmp,intPackNo,shtVersion,shtType,pchPack,intPackSize);
}
template<class _Tp1,class _Tp2,class _Tp3>
static ErrorCode Pack3(char *pchBuf
,int & intSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
,const _Tp1 & value1
,const _Tp2 & value2
,const _Tp3 & value3
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Output(pchBuf+ nTmp,value1);
nTmp += Output(pchBuf+ nTmp,value2);
nTmp += Output(pchBuf+ nTmp,value3);
if( intSize < nTmp )
{
intSize = nTmp;
return Err_OutOfMem;
}
return PackHead(pchBuf,intSize,nTmp,intPackNo,shtVersion,shtType);
}
template<class _Tp1,class _Tp2,class _Tp3>
static ErrorCode Unpack3(int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,_Tp1 & value1
,_Tp2 & value2
,_Tp3 & value3
,const char *pchPack
,const int intPackSize
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Input(pchPack+ nTmp,value1);
nTmp += Input(pchPack+ nTmp,value2);
nTmp += Input(pchPack+ nTmp,value3);
if( intPackSize < nTmp )
{
return Err_ContentErr;
}
return UnpackHead(nTmp,intPackNo,shtVersion,shtType,pchPack,intPackSize);
}
template<class _Tp1,class _Tp2,class _Tp3,class _Tp4>
static ErrorCode Pack4(char *pchBuf
,int & intSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
,const _Tp1 & value1
,const _Tp2 & value2
,const _Tp3 & value3
,const _Tp4 & value4
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Output(pchBuf+ nTmp,value1);
nTmp += Output(pchBuf+ nTmp,value2);
nTmp += Output(pchBuf+ nTmp,value3);
nTmp += Output(pchBuf+ nTmp,value4);
if( intSize < nTmp )
{
intSize = nTmp;
return Err_OutOfMem;
}
return PackHead(pchBuf,intSize,nTmp,intPackNo,shtVersion,shtType);
}
template<class _Tp1,class _Tp2,class _Tp3,class _Tp4>
static ErrorCode Unpack4(int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,_Tp1 & value1
,_Tp2 & value2
,_Tp3 & value3
,_Tp4 & value4
,const char *pchPack
,const int intPackSize
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Input(pchPack+ nTmp,value1);
nTmp += Input(pchPack+ nTmp,value2);
nTmp += Input(pchPack+ nTmp,value3);
nTmp += Input(pchPack+ nTmp,value4);
if( intPackSize < nTmp )
{
return Err_ContentErr;
}
return UnpackHead(nTmp,intPackNo,shtVersion,shtType,pchPack,intPackSize);
}
template<class _Tp1,class _Tp2,class _Tp3,class _Tp4,class _Tp5>
static ErrorCode Pack5(char *pchBuf
,int & intSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
,const _Tp1 & value1
,const _Tp2 & value2
,const _Tp3 & value3
,const _Tp4 & value4
,const _Tp5 & value5
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Output(pchBuf+ nTmp,value1);
nTmp += Output(pchBuf+ nTmp,value2);
nTmp += Output(pchBuf+ nTmp,value3);
nTmp += Output(pchBuf+ nTmp,value4);
nTmp += Output(pchBuf+ nTmp,value5);
if( intSize < nTmp )
{
intSize = nTmp;
return Err_OutOfMem;
}
return PackHead(pchBuf,intSize,nTmp,intPackNo,shtVersion,shtType);
}
template<class _Tp1,class _Tp2,class _Tp3,class _Tp4,class _Tp5>
static ErrorCode Unpack5(int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,_Tp1 & value1
,_Tp2 & value2
,_Tp3 & value3
,_Tp4 & value4
,_Tp5 & value5
,const char *pchPack
,const int intPackSize
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Input(pchPack+ nTmp,value1);
nTmp += Input(pchPack+ nTmp,value2);
nTmp += Input(pchPack+ nTmp,value3);
nTmp += Input(pchPack+ nTmp,value4);
nTmp += Input(pchPack+ nTmp,value5);
if( intPackSize < nTmp )
{
return Err_ContentErr;
}
return UnpackHead(nTmp,intPackNo,shtVersion,shtType,pchPack,intPackSize);
}
template<class _Tp1,class _Tp2,class _Tp3,class _Tp4,class _Tp5,class _Tp6>
static ErrorCode Pack6(char *pchBuf
,int & intSize
,const int intPackNo
,const unsigned short shtVersion
,const unsigned short shtType
,const _Tp1 & value1
,const _Tp2 & value2
,const _Tp3 & value3
,const _Tp4 & value4
,const _Tp5 & value5
,const _Tp6 & value6
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Output(pchBuf+ nTmp,value1);
nTmp += Output(pchBuf+ nTmp,value2);
nTmp += Output(pchBuf+ nTmp,value3);
nTmp += Output(pchBuf+ nTmp,value4);
nTmp += Output(pchBuf+ nTmp,value5);
nTmp += Output(pchBuf+ nTmp,value6);
if( intSize < nTmp )
{
intSize = nTmp;
return Err_OutOfMem;
}
return PackHead(pchBuf,intSize,nTmp,intPackNo,shtVersion,shtType);
}
template<class _Tp1,class _Tp2,class _Tp3,class _Tp4,class _Tp5,class _Tp6>
static ErrorCode Unpack6(int & intPackNo
,unsigned short & shtVersion
,unsigned short & shtType
,_Tp1 & value1
,_Tp2 & value2
,_Tp3 & value3
,_Tp4 & value4
,_Tp5 & value5
,_Tp6 & value6
,const char *pchPack
,const int intPackSize
)
{
int nTmp = _const_pack_head_size + 0;
nTmp += Input(pchPack+ nTmp,value1);
nTmp += Input(pchPack+ nTmp,value2);
nTmp += Input(pchPack+ nTmp,value3);
nTmp += Input(pchPack+ nTmp,value4);
nTmp += Input(pchPack+ nTmp,value5);
nTmp += Input(pchPack+ nTmp,value6);
if( intPackSize < nTmp )
{
return Err_ContentErr;
}
return UnpackHead(nTmp,intPackNo,shtVersion,shtType,pchPack,intPackSize);
}
public:
static int Input(const char* pchSrc, char & chrVal) ;
static int Input(const char* pchSrc, BYTE & chrVal) ;
static int Input(const char* pchSrc, short & shtVal) ;
static int Input(const char* pchSrc, unsigned short & shtVal) ;
static int Input(const char* pchSrc, int & intVal) ;
static int Input(const char* pchSrc, unsigned int & intVal) ;
static int Input(const char* pchSrc, float & fltVal) ;
static int Input(const char* pchSrc, double & dblVal) ;
static int Input(const char* pchSrc, INT64 & int64Val) ;
static int Input(const char* pchSrc, string & strVal) ;
static int Input(const char* pchSrc, SPackHead & head) ;
template <int __size>
static int Input(const char* pchSrc, CStaticString<__size> & fixStr) ;
//
static int Output(char* pchDest, const char chrVal) ;
static int Output(char* pchDest, const BYTE chrVal) ;
static int Output(char* pchDest, const short shtVal) ;
static int Output(char* pchDest, const unsigned short shtVal) ;
static int Output(char* pchDest, const int intVal) ;
static int Output(char* pchDest, const unsigned int intVal) ;
static int Output(char* pchDest, const float fltVal) ;
static int Output(char* pchDest, const double dblVal) ;
static int Output(char* pchDest, const INT64 int64Val) ;
static int Output(char* pchDest, const string & strVal) ;
static int Output(char* pchDest, const SPackHead & head) ;
template <int __size>
static int Output(char* pchDest, const CStaticString<__size> & fixStr);
static void ConvertOrder (short &shtVal);
static void ConvertOrder( int &intVal);
static void ConvertOrder( float &fltVal);
static void ConvertOrder( double &dblVal);
static int GetStrBufSize(const string & str){return str.size() + sizeof(unsigned short);};
static int GetBigStrBufSize(const string & str){return str.size() + sizeof(int);};
static int PackBigStr(char* pchDest, const string & strVal);
static int PackString(vector<char> & buf, const string & strVal);
static int PackBigStr(vector<char> & buf, const string & strVal);
};
inline int CPack::Input(const char* pchSrc, char & chrVal)
{
chrVal = *pchSrc;
return sizeof(chrVal);
}
inline int CPack::Input(const char* pchSrc, BYTE & chrVal)
{
chrVal = *pchSrc;
return sizeof(chrVal);
}
inline int CPack::Input(const char* pchSrc, short & shtVal)
{
char *pch = (char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
#else
*(pch+1) = *(pchSrc+0);
*(pch+0) = *(pchSrc+1);
#endif
return sizeof(shtVal);
}
inline int CPack::Input(const char* pchSrc, unsigned short & shtVal)
{
char *pch = (char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
#else
*(pch+1) = *(pchSrc+0);
*(pch+0) = *(pchSrc+1);
#endif
return sizeof(shtVal);
}
inline int CPack::Input(const char* pchSrc, int & intVal)
{
char *pch = (char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
#else
*(pch+3) = *(pchSrc+0);
*(pch+2) = *(pchSrc+1);
*(pch+1) = *(pchSrc+2);
*(pch+0) = *(pchSrc+3);
#endif
return sizeof(intVal);
}
inline int CPack::Input(const char* pchSrc, unsigned int & intVal)
{
char *pch = (char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
#else
*(pch+3) = *(pchSrc+0);
*(pch+2) = *(pchSrc+1);
*(pch+1) = *(pchSrc+2);
*(pch+0) = *(pchSrc+3);
#endif
return sizeof(intVal);
}
inline int CPack::Input(const char* pchSrc, float & fltVal)
{
char *pch = (char*)&fltVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
#else
*(pch+3) = *(pchSrc+0);
*(pch+2) = *(pchSrc+1);
*(pch+1) = *(pchSrc+2);
*(pch+0) = *(pchSrc+3);
#endif
return sizeof(fltVal);
}
inline int CPack::Input(const char* pchSrc, double & dblVal)
{
char *pch = (char*)&dblVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
*(pch+4) = *(pchSrc+4);
*(pch+5) = *(pchSrc+5);
*(pch+6) = *(pchSrc+6);
*(pch+7) = *(pchSrc+7);
#else
*(pch+7) = *(pchSrc+0);
*(pch+6) = *(pchSrc+1);
*(pch+5) = *(pchSrc+2);
*(pch+4) = *(pchSrc+3);
*(pch+3) = *(pchSrc+4);
*(pch+2) = *(pchSrc+5);
*(pch+1) = *(pchSrc+6);
*(pch+0) = *(pchSrc+7);
#endif
return sizeof(dblVal);
}
inline int CPack::Input(const char* pchSrc, INT64 & int64Val)
{
char *pch = (char*)&int64Val;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
*(pch+4) = *(pchSrc+4);
*(pch+5) = *(pchSrc+5);
*(pch+6) = *(pchSrc+6);
*(pch+7) = *(pchSrc+7);
#else
*(pch+7) = *(pchSrc+0);
*(pch+6) = *(pchSrc+1);
*(pch+5) = *(pchSrc+2);
*(pch+4) = *(pchSrc+3);
*(pch+3) = *(pchSrc+4);
*(pch+2) = *(pchSrc+5);
*(pch+1) = *(pchSrc+6);
*(pch+0) = *(pchSrc+7);
#endif
return sizeof(int64Val);
}
inline int CPack::Input(const char* pchSrc, string & strVal)
{
unsigned short size = 0;
int intLen = Input(pchSrc, size);
vector<char> tmp;
tmp.resize(size + 1);
memcpy(&*tmp.begin(), pchSrc + intLen, size);
strVal = &*tmp.begin();
intLen += size;
return intLen;
}
inline int CPack::Input(const char* pchSrc, SPackHead & head)
{
Input(pchSrc+ 0,head.m_intPackSize);
Input(pchSrc+ 4,head.m_intPackNo);
Input(pchSrc+ 8,head.m_shtVer);
Input(pchSrc+10,head.m_shtType);
return sizeof(head);
}
template <int __size>
inline int CPack::Input(const char* pchSrc, CStaticString<__size> & fixStr)
{
unsigned short size = 0;
int intLen = Input(pchSrc, size);
vector<char> tmp;
tmp.resize(size + 1);
memcpy(&*tmp.begin(), pchSrc + intLen, size);
fixStr = &*tmp.begin();
intLen += size;
return intLen;
}
inline int CPack::Output(char* pchDest, const char chrVal)
{
const char *pch = (const char*)&chrVal;
*(pchDest+0) = *(pch+0);
return sizeof(chrVal);
}
inline int CPack::Output(char* pchDest, const BYTE chrVal)
{
const char *pch = (const char*)&chrVal;
*(pchDest+0) = *(pch+0);
return sizeof(chrVal);
}
inline int CPack::Output(char* pchDest, const short shtVal)
{
const char *pch = (const char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
#else
*(pchDest+0) = *(pch+1);
*(pchDest+1) = *(pch+0);
#endif
return sizeof(shtVal);
}
inline int CPack::Output(char* pchDest, const unsigned short shtVal)
{
const char *pch = (const char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
#else
*(pchDest+0) = *(pch+1);
*(pchDest+1) = *(pch+0);
#endif
return sizeof(shtVal);
}
inline int CPack::Output(char* pchDest, const int intVal)
{
const char *pch = (const char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
#else
*(pchDest+0) = *(pch+3);
*(pchDest+1) = *(pch+2);
*(pchDest+2) = *(pch+1);
*(pchDest+3) = *(pch+0);
#endif
return sizeof(intVal);
}
inline int CPack::Output(char* pchDest, const unsigned int intVal)
{
const char *pch = (const char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
#else
*(pchDest+0) = *(pch+3);
*(pchDest+1) = *(pch+2);
*(pchDest+2) = *(pch+1);
*(pchDest+3) = *(pch+0);
#endif
return sizeof(intVal);
}
inline int CPack::Output(char* pchDest, const float fltVal)
{
const char *pch = (const char*)&fltVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
#else
*(pchDest+0) = *(pch+3);
*(pchDest+1) = *(pch+2);
*(pchDest+2) = *(pch+1);
*(pchDest+3) = *(pch+0);
#endif
return sizeof(fltVal);
}
inline int CPack::Output(char* pchDest, const double dblVal)
{
const char *pch = (const char*)&dblVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
*(pchDest+4) = *(pch+4);
*(pchDest+5) = *(pch+5);
*(pchDest+6) = *(pch+6);
*(pchDest+7) = *(pch+7);
#else
*(pchDest+0) = *(pch+7);
*(pchDest+1) = *(pch+6);
*(pchDest+2) = *(pch+5);
*(pchDest+3) = *(pch+4);
*(pchDest+4) = *(pch+3);
*(pchDest+5) = *(pch+2);
*(pchDest+6) = *(pch+1);
*(pchDest+7) = *(pch+0);
#endif
return sizeof(dblVal);
}
inline int CPack::Output(char* pchDest, const INT64 int64Val)
{
const char *pch = (const char*)&int64Val;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
*(pchDest+4) = *(pch+4);
*(pchDest+5) = *(pch+5);
*(pchDest+6) = *(pch+6);
*(pchDest+7) = *(pch+7);
#else
*(pchDest+0) = *(pch+7);
*(pchDest+1) = *(pch+6);
*(pchDest+2) = *(pch+5);
*(pchDest+3) = *(pch+4);
*(pchDest+4) = *(pch+3);
*(pchDest+5) = *(pch+2);
*(pchDest+6) = *(pch+1);
*(pchDest+7) = *(pch+0);
#endif
return sizeof(int64Val);
}
inline int CPack::Output(char* pchDest, const string & strVal)
{
unsigned short size = strVal.size();
int intLen = Output(pchDest, size);
memcpy(pchDest + intLen, strVal.c_str(), size);
intLen += size;
return intLen;
}
inline int CPack::PackBigStr(char* pchDest, const string & strVal)
{
int size = strVal.size();
int intLen = Output(pchDest, size);
memcpy(pchDest + intLen, strVal.c_str(), size);
intLen += size;
return intLen;
}
inline int CPack::PackString(vector<char> & buf, const string & strVal)
{
int intOldSize = buf.size();
buf.resize(intOldSize + GetStrBufSize(strVal));
return Output(&*buf.begin() + intOldSize, strVal);
}
inline int CPack::PackBigStr(vector<char> & buf, const string & strVal)
{
int intOldSize = buf.size();
buf.resize(intOldSize + GetBigStrBufSize(strVal));
return PackBigStr(&*buf.begin() + intOldSize, strVal);
}
inline int CPack::Output(char* pchDest, const SPackHead & head)
{
Output(pchDest+ 0,head.m_intPackSize);
Output(pchDest+ 4,head.m_intPackNo);
Output(pchDest+ 8,head.m_shtVer);
Output(pchDest+10,head.m_shtType);
return sizeof(head);
}
template <int __size>
inline int CPack::Output(char* pchDest, const CStaticString<__size> & fixStr)
{
unsigned short size = fixStr.size();
int intLen = Output(pchDest, size);
memcpy(pchDest + intLen, fixStr.c_str(), size);
intLen += size;
return intLen;
}
//add 2008-03-17 by zhangcx
inline void CPack::ConvertOrder (short &shtVal)
{
char *pchSrc = ( char*)&shtVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+1);
*(pchSrc+1) = pchTemp;
}
inline void CPack::ConvertOrder( int &intVal)
{
char *pchSrc = (char*)&intVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+3);
*(pchSrc+3) = pchTemp;
pchTemp = *(pchSrc+1);
*(pchSrc+1) = *(pchSrc+2);
*(pchSrc+2) = pchTemp;
}
inline void CPack::ConvertOrder( float &fltVal)
{
char *pchSrc = (char*)&fltVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+3);
*(pchSrc+3) = pchTemp;
pchTemp = *(pchSrc+1);
*(pchSrc+1) = *(pchSrc+2);
*(pchSrc+2) = pchTemp;
}
inline void CPack::ConvertOrder( double &dblVal)
{
char *pchSrc = ( char*)&dblVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+7);
*(pchSrc+7) = pchTemp;
pchTemp = *(pchSrc+1);
*(pchSrc+1) = *(pchSrc+6);
*(pchSrc+6) = pchTemp;
pchTemp = *(pchSrc+2);
*(pchSrc+2) = *(pchSrc+5);
*(pchSrc+5) = pchTemp;
pchTemp = *(pchSrc+3);
*(pchSrc+3) = *(pchSrc+4);
*(pchSrc+4) = pchTemp;
}
//add 2008-03-17 end
}// namespace BackCom
#endif
#include "pack.h"
namespace PACKCOM
{
CPack::ErrorCode CPack::PackHead( char *pOutPackHead ,int & nOutPackHeadSize ,const int intPackSize ,const int intPackNo ,const unsigned short shtVersion ,const unsigned short shtType )
{
Output(pOutPackHead + 0,intPackSize);
Output(pOutPackHead + 4,intPackNo);
Output(pOutPackHead + 8,shtVersion);
Output(pOutPackHead +10,shtType);
// set output length
nOutPackHeadSize = intPackSize;
return Err_NoErr;
}
CPack::ErrorCode CPack::UnpackHead( int & intOutPackSize ,int & intPackNo ,unsigned short & shtVersion ,unsigned short & shtType ,const char *pchPack ,const int intPackSize )
{
if( intPackSize < _const_pack_head_size )
return Err_ContentErr;
Input(pchPack + 0, intOutPackSize);
Input(pchPack + 4, intPackNo);
Input(pchPack + 8, shtVersion);
Input(pchPack + 10, shtType);
return Err_NoErr;
}
CPack::ErrorCode CPack::PackHead(char *pOutPackHead ,int & nOutPackHeadSize ,const SPackHead *pHead )
{
return PackHead(pOutPackHead,nOutPackHeadSize,pHead->m_intPackSize,pHead->m_intPackNo,pHead->m_shtVer,pHead->m_shtType);
}
CPack::ErrorCode CPack::UnpackHead(SPackHead *pHead ,const char *pInPack ,const int nInPackSize )
{
return UnpackHead(pHead->m_intPackSize,pHead->m_intPackNo,pHead->m_shtVer,pHead->m_shtType,pInPack,nInPackSize);
}
bool CPack::UnpackHead( const char * pBuf, const int intLen, const unsigned short shtType, const unsigned short shtVer, int & intPackNo )
{
unsigned short shtVersion;
unsigned short shtPackType;
int intPackSize = 0;
UnpackHead(intPackSize, intPackNo,shtVersion,shtPackType,pBuf,intLen);
if(intPackSize > intLen)
{
return false;
}
if(shtVersion != shtVer)
{
return false;
}
if(shtPackType != shtType)
{
return false;
}
return true;
}
int CPack::Input( const char* pchSrc, char & chrVal, int nDataLen /*= sizeof(char)*/ )
{
chrVal = *pchSrc;
return sizeof(chrVal);
}
int CPack::Input( const char* pchSrc, unsigned char & chrVal, int nDataLen /*= sizeof(unsigned char)*/ )
{
chrVal = *pchSrc;
return sizeof(chrVal);
}
int CPack::Input( const char* pchSrc, short & shtVal, int nDataLen /*= sizeof(short)*/ )
{
char *pch = (char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
#else
*(pch+1) = *(pchSrc+0);
*(pch+0) = *(pchSrc+1);
#endif
return sizeof(shtVal);
}
int CPack::Input( const char* pchSrc, unsigned short & shtVal, int nDataLen /*= sizeof(unsigned short)*/ )
{
char *pch = (char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
#else
*(pch+1) = *(pchSrc+0);
*(pch+0) = *(pchSrc+1);
#endif
return sizeof(shtVal);
}
int CPack::Input( const char* pchSrc,int& intVal,int nDataLen /*= sizeof(int)*/ )
{
char* pch = (char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
#else
*(pch+3) = *(pchSrc+0);
*(pch+2) = *(pchSrc+1);
*(pch+1) = *(pchSrc+2);
*(pch+0) = *(pchSrc+3);
#endif
return sizeof(intVal);
}
int CPack::Input( const char* pchSrc, unsigned int & intVal, int nDataLen /*= sizeof(unsigned int)*/ )
{
char *pch = (char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
#else
*(pch+3) = *(pchSrc+0);
*(pch+2) = *(pchSrc+1);
*(pch+1) = *(pchSrc+2);
*(pch+0) = *(pchSrc+3);
#endif
return sizeof(intVal);
}
int CPack::Input( const char* pchSrc, float& fltVal, int nDataLen /*= sizeof(float)*/ )
{
char *pch = (char*)&fltVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
#else
*(pch+3) = *(pchSrc+0);
*(pch+2) = *(pchSrc+1);
*(pch+1) = *(pchSrc+2);
*(pch+0) = *(pchSrc+3);
#endif
return sizeof(fltVal);
}
int CPack::Input( const char* pchSrc, double & dblVal, int nDataLen )
{
char *pch = (char*)&dblVal;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
*(pch+4) = *(pchSrc+4);
*(pch+5) = *(pchSrc+5);
*(pch+6) = *(pchSrc+6);
*(pch+7) = *(pchSrc+7);
#else
*(pch+7) = *(pchSrc+0);
*(pch+6) = *(pchSrc+1);
*(pch+5) = *(pchSrc+2);
*(pch+4) = *(pchSrc+3);
*(pch+3) = *(pchSrc+4);
*(pch+2) = *(pchSrc+5);
*(pch+1) = *(pchSrc+6);
*(pch+0) = *(pchSrc+7);
#endif
return sizeof(dblVal);
}
int CPack::Input( const char* pchSrc, INT64 & int64Val, int nDataLen )
{
char *pch = (char*)&int64Val;
#if defined(_BIG_ENDIAN)
*(pch+0) = *(pchSrc+0);
*(pch+1) = *(pchSrc+1);
*(pch+2) = *(pchSrc+2);
*(pch+3) = *(pchSrc+3);
*(pch+4) = *(pchSrc+4);
*(pch+5) = *(pchSrc+5);
*(pch+6) = *(pchSrc+6);
*(pch+7) = *(pchSrc+7);
#else
*(pch+7) = *(pchSrc+0);
*(pch+6) = *(pchSrc+1);
*(pch+5) = *(pchSrc+2);
*(pch+4) = *(pchSrc+3);
*(pch+3) = *(pchSrc+4);
*(pch+2) = *(pchSrc+5);
*(pch+1) = *(pchSrc+6);
*(pch+0) = *(pchSrc+7);
#endif
return sizeof(int64Val);
}
int CPack::Input( const char* pchSrc, SPackHead& head, int nDataLen /*= sizeof(SPackHead)*/ )
{
Input(pchSrc+ 0,head.m_intPackSize);
Input(pchSrc+ 4,head.m_intPackNo);
Input(pchSrc+ 8,head.m_shtVer);
Input(pchSrc+10,head.m_shtType);
return sizeof(head);
}
int CPack::Input( const char* pData, string&val, int nDataLen )
{
val.clear();
unsigned short s = 0;
int intLen = Input(pData, s, nDataLen);
if( intLen == -1 ) return -1;
if( (s + intLen) > nDataLen ) return -1;
val.append(pData + sizeof(s), s);
intLen += s;
return intLen;
}
int CPack::Output( char* pchDest, const char chrVal )
{
const char *pch = (const char*)&chrVal;
*(pchDest+0) = *(pch+0);
return sizeof(chrVal);
}
int CPack::Output( char* pchDest, const unsigned char chrVal )
{
const char *pch = (const char*)&chrVal;
*(pchDest+0) = *(pch+0);
return sizeof(chrVal);
}
int CPack::Output( char* pchDest, const short shtVal )
{
const char *pch = (const char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
#else
*(pchDest+0) = *(pch+1);
*(pchDest+1) = *(pch+0);
#endif
return sizeof(shtVal);
}
int CPack::Output( char* pchDest, const unsigned short shtVal )
{
const char *pch = (const char*)&shtVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
#else
*(pchDest+0) = *(pch+1);
*(pchDest+1) = *(pch+0);
#endif
return sizeof(shtVal);
}
int CPack::Output( char* pchDest, const int intVal )
{
const char *pch = (const char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
#else
*(pchDest+0) = *(pch+3);
*(pchDest+1) = *(pch+2);
*(pchDest+2) = *(pch+1);
*(pchDest+3) = *(pch+0);
#endif
return sizeof(intVal);
}
int CPack::Output( char* pchDest, const unsigned int intVal )
{
const char *pch = (const char*)&intVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
#else
*(pchDest+0) = *(pch+3);
*(pchDest+1) = *(pch+2);
*(pchDest+2) = *(pch+1);
*(pchDest+3) = *(pch+0);
#endif
return sizeof(intVal);
}
int CPack::Output( char* pchDest, const float fltVal )
{
const char *pch = (const char*)&fltVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
#else
*(pchDest+0) = *(pch+3);
*(pchDest+1) = *(pch+2);
*(pchDest+2) = *(pch+1);
*(pchDest+3) = *(pch+0);
#endif
return sizeof(fltVal);
}
int CPack::Output( char* pchDest, const double dblVal )
{
const char *pch = (const char*)&dblVal;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
*(pchDest+4) = *(pch+4);
*(pchDest+5) = *(pch+5);
*(pchDest+6) = *(pch+6);
*(pchDest+7) = *(pch+7);
#else
*(pchDest+0) = *(pch+7);
*(pchDest+1) = *(pch+6);
*(pchDest+2) = *(pch+5);
*(pchDest+3) = *(pch+4);
*(pchDest+4) = *(pch+3);
*(pchDest+5) = *(pch+2);
*(pchDest+6) = *(pch+1);
*(pchDest+7) = *(pch+0);
#endif
return sizeof(dblVal);
}
int CPack::Output( char* pchDest, const INT64 int64Val )
{
const char *pch = (const char*)&int64Val;
#if defined(_BIG_ENDIAN)
*(pchDest+0) = *(pch+0);
*(pchDest+1) = *(pch+1);
*(pchDest+2) = *(pch+2);
*(pchDest+3) = *(pch+3);
*(pchDest+4) = *(pch+4);
*(pchDest+5) = *(pch+5);
*(pchDest+6) = *(pch+6);
*(pchDest+7) = *(pch+7);
#else
*(pchDest+0) = *(pch+7);
*(pchDest+1) = *(pch+6);
*(pchDest+2) = *(pch+5);
*(pchDest+3) = *(pch+4);
*(pchDest+4) = *(pch+3);
*(pchDest+5) = *(pch+2);
*(pchDest+6) = *(pch+1);
*(pchDest+7) = *(pch+0);
#endif
return sizeof(int64Val);
}
int CPack::Output( char* pchDest, const string & strVal )
{
unsigned short size = strVal.size();
int intLen = Output(pchDest, size);
memcpy(pchDest + intLen, strVal.c_str(), size);
intLen += size;
return intLen;
}
int CPack::Output( char* pchDest, const SPackHead & head )
{
Output(pchDest+0,head.m_intPackSize);
Output(pchDest+4,head.m_intPackNo);
Output(pchDest+8,head.m_shtVer);
Output(pchDest+10,head.m_shtType);
return sizeof(head);
}
void CPack::ConvertOrder( short &shtVal )
{
char *pchSrc = ( char*)&shtVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+1);
*(pchSrc+1) = pchTemp;
}
void CPack::ConvertOrder( int &intVal )
{
char *pchSrc = (char*)&intVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+3);
*(pchSrc+3) = pchTemp;
pchTemp = *(pchSrc+1);
*(pchSrc+1) = *(pchSrc+2);
*(pchSrc+2) = pchTemp;
}
void CPack::ConvertOrder( float &fltVal )
{
char *pchSrc = (char*)&fltVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+3);
*(pchSrc+3) = pchTemp;
pchTemp = *(pchSrc+1);
*(pchSrc+1) = *(pchSrc+2);
*(pchSrc+2) = pchTemp;
}
void CPack::ConvertOrder( double &dblVal )
{
char *pchSrc = ( char*)&dblVal;
char pchTemp;
pchTemp = *(pchSrc+0);
*(pchSrc+0) = *(pchSrc+7);
*(pchSrc+7) = pchTemp;
pchTemp = *(pchSrc+1);
*(pchSrc+1) = *(pchSrc+6);
*(pchSrc+6) = pchTemp;
pchTemp = *(pchSrc+2);
*(pchSrc+2) = *(pchSrc+5);
*(pchSrc+5) = pchTemp;
pchTemp = *(pchSrc+3);
*(pchSrc+3) = *(pchSrc+4);
*(pchSrc+4) = pchTemp;
}
int CPack::PackBigStr( char* pchDest, const string & strVal )
{
int size = strVal.size();
int intLen = Output(pchDest, size);
memcpy(pchDest + intLen, strVal.c_str(), size);
intLen += size;
return intLen;
}
int CPack::PackBigStr( vector<char> & buf, const string & strVal )
{
int intOldSize = buf.size();
buf.resize(intOldSize + GetBigStrBufSize(strVal));
return PackBigStr(&*buf.begin() + intOldSize, strVal);
}
int CPack::PackString( vector<char> & buf, const string & strVal )
{
int intOldSize = buf.size();
buf.resize(intOldSize + GetStrBufSize(strVal));
return Output(&*buf.begin() + intOldSize, strVal);
}
int CPack::SafeUnpackString( const char * pBuf, const int intLen, string & str )
{
//¹ýÀ´µÄûÓаüÍ· Ö»ÓаüÌå
unsigned short size = 0;
int intTmp = intLen;
if(intTmp < sizeof(size))
{ //³¤¶ÈСÓÚ2 ¾Í³ö´í´¦Àí Êг¡Í·´óÓÚ2
return -1;
}
//È¡³ö°üÌ峤¶È
intTmp -= Input(pBuf, size);
if(intTmp - size < 0)
{
return -1;
}
vector<char> tmp;
tmp.resize(size + 1);
memcpy(&*tmp.begin(), pBuf + sizeof(size), size);
str = &*tmp.begin();
intTmp -= size;
return intLen - intTmp;
}
int CPack::UnpackStringList( const char * pBuf, const int intLen, vector<string> & data )
{
const char * pPos = pBuf;
int intTmp = intLen;
int intSize = 0;
if(intTmp < sizeof(intSize))
{
return -1;
}
pPos += Input(pPos, intSize);
for(int i = 0; i < intSize; i++)
{
string str;
int intStrLen = 0;
intStrLen = SafeUnpackString(pPos, intLen - (pPos - pBuf), str);
if(intStrLen == -1)
{
return -1;
}
pPos += intStrLen;
data.push_back(str);
}
return pPos - pBuf;
}
}