整个TCP网络部分代码全被封装在TcpIOThread中。
说明:
1. 通过将NetWork 指针传递进来将此部分与整个业务逻辑关联起来。
2. Acceptor和Read联系在一起,Accept之后添加到Read的EventLoop(EventLoop下一篇详细说明)中进行处理。
3. Write和Accept、Read逻辑不相关联,是独立的模块,启动的每一个Write都有自己的EventLoop,并通过此EventLoop。不会用同一个socket即发消息也收消息(当成平等的对端,发送消息建立连接,接收消息accept).不知道我说明白了么。感觉自己好啰嗦
class TcpIOThread
{
public:
TcpIOThread(NetWork * poNetWork);
~TcpIOThread();
int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount);
void Start();
void Stop();
int AddMessage(const int iGroupIdx, const std::string & sIP, const int iPort, const std::string & sMessage);
private:
NetWork * m_poNetWork; //使TCP模块与整个业务逻辑挂钩,初始化时传递进来
TcpAcceptor m_oTcpAcceptor; //接收,添加事件循环
std::vector<TcpRead *> m_vecTcpRead; //数据接收,初始化时传入NetWork
std::vector<TcpWrite *> m_vecTcpWrite; //数据发送,初始化时传入NetWork
bool m_bIsStarted;
};
TcpIOThread :: TcpIOThread(NetWork * poNetWork)
: m_poNetWork(poNetWork)
{
m_bIsStarted = false;
assert(signal(SIGPIPE, SIG_IGN) != SIG_ERR);
assert(signal(SIGALRM, SIG_IGN) != SIG_ERR);
assert(signal(SIGCHLD, SIG_IGN) != SIG_ERR);
}
TcpIOThread :: ~TcpIOThread()
{
for (auto & poTcpRead : m_vecTcpRead)
{
delete poTcpRead;
}
for (auto & poTcpWrite : m_vecTcpWrite)
{
delete poTcpWrite;
}
}
void TcpIOThread :: Stop()
{
if (m_bIsStarted)
{
m_oTcpAcceptor.Stop();
for (auto & poTcpRead : m_vecTcpRead)
{
poTcpRead->Stop();
}
for (auto & poTcpWrite : m_vecTcpWrite)
{
poTcpWrite->Stop();
}
}
PLHead("TcpIOThread [END]");
}
int TcpIOThread :: Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount)
{
//创建TCP read/write,每一个read/write都是单独一个线程。
for (int i = 0; i < iIOThreadCount; i++)
{
TcpRead * poTcpRead = new TcpRead(m_poNetWork);
assert(poTcpRead != nullptr);
m_vecTcpRead.push_back(poTcpRead);
//此处将Read和Acceptor联系起来
m_oTcpAcceptor.AddEventLoop(poTcpRead->GetEventLoop());
TcpWrite * poTcpWrite = new TcpWrite(m_poNetWork);
assert(poTcpWrite != nullptr);
m_vecTcpWrite.push_back(poTcpWrite);
}
//监听
m_oTcpAcceptor.Listen(sListenIp, iListenPort);
int ret = -1;
//初始化read/write
for (auto & poTcpRead : m_vecTcpRead)
{
ret = poTcpRead->Init();
if (ret != 0)
{
return ret;
}
}
for (auto & poTcpWrite: m_vecTcpWrite)
{
ret = poTcpWrite->Init();
if (ret != 0)
{
return ret;
}
}
return 0;
}
void TcpIOThread :: Start()
{
//启动所有网络相关模块
m_oTcpAcceptor.start();
for (auto & poTcpWrite : m_vecTcpWrite)
{
poTcpWrite->start();
}
for (auto & poTcpRead : m_vecTcpRead)
{
poTcpRead->start();
}
m_bIsStarted = true;
}
//发送消息到指定机器,选择一个TCPWrite发送相关数据。
int TcpIOThread :: AddMessage(const int iGroupIdx, const std::string & sIP, const int iPort, const std::string & sMessage)
{
int iIndex = iGroupIdx % (int)m_vecTcpWrite.size();
return m_vecTcpWrite[iIndex]->AddMessage(sIP, iPort, sMessage);
}
}
这个代码就是对其私有成员进行初始化并启动的过程。接收部分在read中因为传递NetWork指针进去了,所以直接在里面就会将信息投递到整个处理逻辑队列中,所以此处看不到相关的接收处理逻辑
.
下面是一个我手画的图,凑活着看吧