Muduo网络库简介
muduo 是一个基于 Reactor 模式的现代 C++ 网络库,作者陈硕。它采用非阻塞 IO 模型,基于事件驱动和回调,原生支持多核多线程,适合编写 Linux 服务端多线程网络应用程序。
muduo网络库的核心代码只有数千行,在网络编程技术学习的进阶阶段,muduo是一个非常值得学习的开源库。目前我也是刚刚开始学习这个网络库的源码,希望将这个学习过程记录下来。这个网络库的源码已经发布在GitHub上,可以点击这里阅读。目前Github上这份源码已经被作者用c++11重写,我学习的版本是没有使用c++11版本的。不过二者大同小异,核心思想是没有变化的。点这里可以看我的源代码。从笔记十七开始记录muduo的net库的实现过程。如果你需要看一下基础库(base)的复现过程,可以点击这里:muduo的base库实现过程。而网络库的笔记在这里:
muduo网络库源码复现笔记(十七):什么都不做的EventLoop
muduo网络库源码复现笔记(十八):Reactor的关键结构
muduo网络库源码复现笔记(十九):TimeQueue定时器
muduo网络库源码复现笔记(二十):EventLoop::runInloop()函数和EventLoopThread类
muduo网络库源码复现笔记(二十一):Acceptor类、InetAddress类、Sockets类、SocketsOps.cc
muduo网络库源码复现笔记(二十二):TcpServer类与TcpConnection初步
muduo网络库源码复现笔记(二十三):TcpConnection断开连接
1 实现多线程服务器
正如前面博客所说的,muduo库中多线程服务器采用的是one loop per thread的方式。如下图所示,我们将拥有Acceptor的I/O线程称作mainReator,当客户端发起连接,由acceptor转发交给subReator与客户端进行连接。subReactor在单独线程中,这样就实现了多线程的服务器。前面我们也讲了使用EventLoopThead类在线程中创建一个EventLoop,结合前面知识,这一节讨论如何使用线程池实现多线程服务器。
2 EventLoopThreadPool类
EventLoopThreadPool顾名思义就是一个EventLoop的线程池,注意它的两个成员变量threads_和loops_,分别是EventLoopThread的指针数组和EventLopp指针数组。baseLoop_就是拥有acceptor的EventLoop,numThreads_是线程池的线程数目。next的含义后面说。
class EventLoopThreadPool : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads)
{
numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
EventLoop* getNextLoop();
private:
EventLoop* baseLoop_; //same EventLoop as acceptor
bool started_;
int numThreads_;
int next_; //index of the chosen EventLoop when new connection arrives
boost::ptr_vector<EventLoopThread> threads_;
std::vector<EventLoop*> loops_;
};
start函数也好理解,它根据numThreads_的数目一次开启线程创建EventLoop,等待任务执行。而getNextLoop()作用是这样:当客户端发起连接时,mainReactor会使用getNextLoop函数返回一个subReator来与客户端建立连接。getNextLoop的使用机制是轮询(robin-round),依次从线程池拿出一个线程的EventLoop来建立连接。所以前面说的next_就是线程池待拿出线程的下标。当next_达到loops_的size时置零。
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_ -> assertInLoopThread();
EventLoop* loop = baseLoop_; //avoid if loops_ is empty
//round robin
if(!loops_.empty())
{
loop = loops_[next_];
++next_;
if(implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
3 TcpServer修改
3.1建立连接
newConnection函数要做下面3点修改,首先把原来拥有acceptor的IO线程换成线程池里的线程ioLoop,建立连接时绑定ioLoop,把connectEstablished加入queueInLoop中。
void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr)
{
loop_ -> assertInLoopThread();
EventLoop* ioLoop = threadPool_ -> getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
ioLoop -> runInLoop(boost::bind(&TcpConnection::connectEstablished,conn));
}
3.2断开连接
断开连接时要把removeConnection函数,分为两个。因为需要在TcpConnection自己的IO线程中调用removeConnection。
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_ -> runInLoop(boost::bind(&TcpServer::removeConnectionInLoop,this,conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_ -> assertInLoopThread();
LOG_INFO << "TcpServer::removeConnctionInLoop [" << name_
<< "] - connection " << conn -> name();
LOG_TRACE << " [8] usecount = " << conn.use_count();
size_t n = connections_.erase(conn->name());
LOG_TRACE << " [9] usecount = " << conn.use_count();
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop -> queueInLoop(boost::bind(&TcpConnection::connectDestroyed,conn));
//loop_ -> queueInLoop(
//boost::bind(&TcpConnection::connectDestroyed,conn));
LOG_TRACE << " [10] usecount = " << conn.use_count();
}