muduo 对虚基类接口,类似如下定义
class ICallBcak {
public:
virtual ConnectCb ( );
virtual ReadCb();
}
很是反感,认为虚基类,是上了贼船,如果继承则必须都实现,工作量较大,且不优雅,,很重不够轻量级
抛弃 虚接口 定义,而是使用 boost::bind( &SomeFunClass::Fun1, FunClassPtr, _1, _2, _3 ) 方式实现,,很简洁优雅,没有继承多态的顾虑
muduo 源码中大量使用 boost::bind 替代虚函数 处理回调接口。外部调用者只需要按照回调函数的接口定义规则实现回调接口,传入muduo的导出功能类内部即可。无需考虑继承某些回调接口,导致工作量较大
1. eventloop 启动后 关注两个重要 fd ,wakeupfd_ 和 timerfd_
前者 负责 在eventloop循环中被叫醒,
后者 负责TimerQueue内
这两个fd 都分别被 Channel 封装
故 只要eventloop启动,至少关联两个事件 timerfd 和 wakeupfd
2. Channel 封装 fd_ 句柄 和对应的 event 事件,以及 读、写、关闭等 cb 函数
3. eventloop 内 聚合 epoll IO复用对象
boost::scoped_ptr<Poller> poller_; 这个 poller 在eventloop构造函数中创建,负责对上述 timerfd和wakeupfd进行监视,以及对当前eventloop以后加入的需要关注的事件对象进行监视和IO处理。
5 . 自己打算使用muduo的tcpserver的话,假设我们的服务类叫: CSomeTcpServer
只需要包含 muduo::TcpServer,而不是继承。
CSomeTcpServer 需要传入 主线程 的 EventLoop mainLoop
相当与在主线程中实现一个消息循环,类似windows的GetMessage
5.1 主线程 使用 Acceptor 来实现服务器套接字创建,bind,listen
5.2 Acceptor 的 listen 操作 并没有在TcpServer 的start内直接调用,而是多了一步 runInLoop 不清楚为什么不直接调用。
RunInLoop 内会判断,如果在当前线程则可直接调用,如果是跨线程调用则,QueueInLoop 把 要执行的函数放入 EventLoop 的pendingFunctors内执行(当然了,加入未觉队列,是要加锁),且判断 如果不在当前loop线程 或者 已经正在执行 pending 则wakeupfd通知一下,让eventloop内的 epoll_wait 及时返回,开始执行 pendingFunctor
此时,TcpServer 的 start 接口启动主线程侦听功能,而runInLoop则会判断是否当前线程是否根loop所在线程属于同一个线程,如果是则在当前线程中调用,目前看来一定是,所以在当前线程中执行Acceptor::listen
void TcpServer::start() {
if (started_.getAndSet(1) == 0) {
threadPool_->start(threadInitCallback_);
assert(!acceptor_->listenning());
loop_->runInLoop(boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
5.3 acceptor 会在TcpServer构造函数内,设置新连接到来的回调函数
acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));
5.4 TcpServer 在主线程中被 EventLoop 卡住,而 EventLoop 中最初关注的事件至少三个,
wakeupfd,timerfd(TimeQueue),还有侦听listen socket 任何一个有事件出发,eventloop 都会返回.
IO eventloop 工作线程,最少关注事件对象两个: wakeupfd, timerfd
5.5 void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) 在主线程中,被触发调用。
因为主线程序 使用eventloop 关注了listensocket的读事件,有连接发生就会触发
void TcpServer::newConnection
进而 触发 TcpServer的线程池选择工作线程逻辑,Round robin找到一个io工作线程的ioLoop
然后执行 ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
要求在io工作线程内执行 connectEstablished
6. 每接受一个新的tcp连接,实例化一个新的TcpConnection,这个新连接对象通过RoundRobin发送给一个工作线程。
注意: 每个tcpserver内都有一个 EventThreadPool,实现的线程池,每个连接,会被派发到一个eventloop内,也就是一个eventloop会包含多个tcpconnection,而一个tcpconnection 只对应一个 eventloop。
每个 TcpConnection 会设置回调函数 包括(连接回调,接受消息回调,写数据回调,关闭连接回调)。
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
ioLoop 是TcpServer通过round robin选取的一个eventloop线程中的 eventloop 对象指针。
通过ioLoop来让connectEstablished 在属于他的IO线程中执行。
queueInLoop(Functor& cb),将cb放入队列,并在必要时唤醒IO线程。有两种情况需要唤醒IO线程:
1 调用 queueInLoop() 的线程不是IO线程;
2 调用 queueInLoop() 的线程是IO线程,而此时正在调用pengding functor。
当一个fd想要注册可读事件时,首先通过
Channel::enableReading()->
Channel::update(this)->
EventLoop::updateChannel(Channel)->
Poller::updateChannel(Channel*)
调用链向poll系统调用的侦听事件表注册或者修改注册事件。
关键类
EventLoop
Channel
Poller
TcpConnection
TcpClient
TcpServer
Connector
Acceptor
EventLoopThread
EventLoopThreadPool
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
TcpConnection抽象一个TCP连接,无论是客户端还是服务器只要建立了网络连接就会使用TcpConnection;
Connector/Acceptor分别包装TCP客户端和服务器的建立连接/接受连接;
EventLoop是一个主控类,是一个事件发生器,它驱动Poller产生/发现事件,然后将事件派发到Channel处理;
EventLoopThread是一个带有EventLoop的线程;EventLoopThreadPool自然是一个EventLoopThread的资源池,维护一堆EventLoopThread。
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////服务器接收连接
服务器接收连接的实现在一个网络库中比较重要。muduo中通过Acceptor类来接收连接。在TcpClient中,其Connector通过一个关心Channel可写的事件来通过连接已建立;在Acceptor中则是通过一个Channel可读的事件来表示有新的连接到来:Acceptor::Acceptor(....) {
...
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));
...
}
void Acceptor::handleRead()
{
...
int connfd = acceptSocket_.accept(&peerAddr); // 接收连接获得一个新的socket
if (connfd >= 0)
{
...
newConnectionCallback_(connfd, peerAddr); // 回调到TcpServer::newConnection
TcpServer::newConnection中建立一个TcpConnection,并将其附加到一个EventLoopThread中,简单来说就是给其配置一个线程:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
IO的驱动
之前提到,一旦要关心某IO事件了,就调用Channel::enableXXX,这个如何实现的呢?
class Channel {
...
void enableReading() { events_ |= kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void Channel::update()
{
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
...
poller_->updateChannel(channel);
}
最终调用到Poller::upateChannel。muduo中有两个Poller的实现,分别是Poll和EPoll,可以选择简单的Poll来看:
void PollPoller::updateChannel(Channel* channel)
{
...
if (channel->index() < 0)
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events()); // 也就是Channel::enableXXX操作的那个events_
pfd.revents = 0;
pollfds_.push_back(pfd); // 加入一个新的pollfd
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
可见Poller就是把Channel关心的IO事件转换为OS提供的IO模型数据结构上。通过查看关键的pollfds_的使用,可以发现其主要是在Poller::poll接口里。这个接口会在EventLoop的主循环中不断调用:
void EventLoop::loop()
{
...
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_); // 获得IO事件,通知各注册回调
}
整个流程可总结为:各Channel内部会把自己关心的事件告诉给Poller,Poller由EventLoop驱动检测IO,然后返回哪些Channel发生了事件,EventLoop再驱动这些Channel调用各注册回调。
从这个过程中可以看出,EventLoop就是一个事件产生器。
线程模型
在muduo的服务器中,muduo的线程模型是怎样的呢?它如何通过线程来支撑高并发呢?其实很简单,它为每一个线程配置了一个EventLoop,这个线程同时被附加了若干个网络连接,这个EventLoop服务于这些网络连接,为这些连接收集并派发IO事件。
回到TcpServer::newConnection中:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
...
EventLoop* ioLoop = threadPool_->getNextLoop();
...
TcpConnectionPtr conn(new TcpConnection(ioLoop, // 使用这个选择到的线程中的EventLoop
connName,
sockfd,
localAddr,
peerAddr));
...
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
注意TcpConnection::connectEstablished是如何通过Channel注册关心的IO事件到ioLoop的。
极端来说,muduo的每一个连接线程可以只为一个网络连接服务,这就有点类似于thread per connection模型了。
具体关键调用逻辑
建立连接
TcpClient::connect-> Connector::start
-> EventLoop::runInLoop(Connector::startInLoop...
-> Connector::connect
EventLoop::runInLoop接口用于在this所在的线程运行某个函数,这个后面看下EventLoop的实现就可以了解。 网络连接的最终建立是在Connector::connect中实现,建立连接之后会创建一个Channel来代表这个socket,并且绑定事件监听接口。最后最重要的是,调用Channel::enableWriting。Channel有一系列的enableXX接口,这些接口用于标识自己关心某IO事件。后面会看到他们的实现。
Connector监听的主要事件无非就是连接已建立,用它监听读数据/写数据事件也不符合设计。TcpConnection才是做这种事的。
客户端收发数据
当Connector发现连接真正建立好后,会回调到TcpClient::newConnection,在TcpClient构造函数中:
connector_->setNewConnectionCallback(
boost::bind(&TcpClient::newConnection, this, _1));
TcpClient::newConnection中创建一个TcpConnection来代表这个连接:
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
...
conn->connectEstablished();
并同时设置事件回调,以上设置的回调都是应用层(即库的使用者)的接口。每一个TcpConnection都有一个Channel,毕竟每一个网络连接都对应了一个socket fd。在TcpConnection构造函数中创建了一个Channel,并设置事件回调函数。
TcpConnection::connectEstablished函数最主要的是通知Channel自己开始关心IO读取事件:
void TcpConnection::connectEstablished()
{
...
channel_->enableReading();
这是自此我们看到的第二个Channel::enableXXX接口,这些接口是如何实现关心IO事件的呢?这个后面讲到。
muduo的数据发送都是通过TcpConnection::send完成,这个就是一般网络库中在不使用OS的异步IO情况下的实现:缓存应用层传递过来的数据,在IO设备可写的情况下尽量写入数据。这个主要实现在TcpConnection::sendInLoop中。
TcpConnection::sendInLoop(....) {
...
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) // 设备可写且没有缓存时立即写入
{
nwrote = sockets::write(channel_->fd(), data, len);
}
...
// 否则加入数据到缓存,等待IO可写时再写
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
// 注册关心IO写事件,Poller就会对写做检测
channel_->enableWriting();
}
...
}
当IO可写时,Channel就会回调TcpConnection::handleWrite(构造函数中注册)
void TcpConnection::handleWrite()
{
...
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
服务器端的数据收发同客户端机制一致,不同的是连接(TcpConnection)的建立方式不同。