线程间的等到通知
pipesocketpair
eventfd
条件变量
前三个都有文件描述符,都可以方便的利用I/O复用来管理,而条件变量没有
mduo库的线程唤醒利用的eventfd,eventfd 是一个比 pipe 更高效的线程间事件通知机制,一方面它比 pipe 少用一个 file descripor,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。
//用于eventfd,用于保存eventfd创建的文件描述符
int wakeupFd_;
// unlike in TimerQueue, which is an internal class,
// we don't expose Channel to client.
//wakeupFd_所对应的通道,该通道将会纳入poller_来管理
boost::scoped_ptr<Channel> wakeupChannel_;//EventLoop只负责wakeupChannel_的生存期,还有其它的channel
//<Functor是回调的任务
std::vector<Functor> pendingFunctors_;
相关的一些重要的函数
//一个线程唤醒另一个线程
void EventLoop::wakeup()
{
uint64_t one = 1;
//往wakeupFd_中写入8个字节就可以唤醒一个等待的线程
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
//在某个线程中执行某个回调函数,该函数可以跨线程使用
void EventLoop::runInLoop(Functor&& cb)
{
// 如果是当前IO线程调用runInLoop,则同步调用cb
if (isInLoopThread())
{
cb();
}
// 如果是其它线程调用runInLoop,则异步地将cb添加到队列,以便eventLoop所对应的I/O线程来执行回调函数
else
{
queueInLoop(std::move(cb));
}
}
//将任务添加到队列中
void EventLoop::queueInLoop(Functor&& cb)
{
{
MutexLockGuard lock(mutex_);
//添加到一个任务队列中
pendingFunctors_.push_back(std::move(cb)); // emplace_back
}
// 调用queueInLoop的线程不是当前IO线程需要唤醒
// 或者调用queueInLoop的线程是当前IO线程,并且此时正在调用pending functor,需要唤醒
// 只有当前IO线程的事件回调中调用queueInLoop才不需要唤醒
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
void EventLoop::loop()
{
//其它线程或者当前IO线程添加的一些回调任务
//因为IO线程的设计比较灵活,IO线程也能执行一些IO任务
//I/O不是很繁忙的时候,IO线程就一直处于阻塞的状态,也就是不工作的状态
//我们就可以添加一些计算任务,让它来处理
doPendingFunctors();
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
//pendingFunctors_中的回调任务都放到了functors中
functors.swap(pendingFunctors_);
}
//免得另有其它线程往pendingFunctors_中添加任务,所以处理一个确定的functors任务
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
这样在不用锁的情况下,也能够保证线程安全,实现线程安全的异步调用了。
下面就是结合了定时器的使用
//增加一个定时器,参数是定时器的回调函数、超时时间、间隔时间
//返回一个TimerId对象
//线程安全的,其它线程也可以调用addTimer,此时就会把&TimerQueue::addTimerInLoop这个任务
//交给loop_所对应的IO线程来处理
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{
//构造一个定时器对象,返回一个地址
Timer* timer = new Timer(cb, when, interval);
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
调用queueInLoop的线程不是当前IO线程(这里的当前线程是指EventLoop所指的线程)需要唤醒,比如A线程要往B线程添加一个任务,添加到线程B的任务队列中,要让B能够执行任务,需要唤醒B,以便让B执行,B是IO线程,它处于loop中(pool),唤醒它以便能够执行到doPendingFunctors
或者调用queueInLoop的线程是当前IO线程,并且此时正在调用pendingfunctor,需要唤醒。这种情况就是在doPendingFunctors中又调用了queueInLoop,否则的话pendingfunctor都执行完成之后,,回来的时候又调用了一个queueInLoop,这是就没有办法及时处理了。
只有IO线程的事件回调中调用queueInLoop才不需要唤醒,如果是在handleEvent中调用了queueInLoop,此时不需要唤醒,因为handleEvent完了之后就会doPendingFunctors。有了runInLoop这个函数,我们就可以添加一些任务到IO线程,让IO线程来执行,如果调用runInLoop的线程不是当前线程,就把这个任务添加到队列中,如果是当前线程,直接调用cb。
EventLoopThread
任何一个线程,只要创建并运行了EventLoop,都称之为IO线程
IO线程不一定是主线程
muduo并发模型one loop per thread + threadpool,也就是说一个IO线程有且只有一个EventLoop,而一个程序可以有多个IO线程,这些线程可以用IO线程池来管理,而threadpool是计算线程池,主要用于执行计算任务。IO线程主要处理IO事件,也可以执行一些计算任务,调用EventLoop中的runInLoop来执行计算任务。
为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程
EventLoopThread创建了一个线程
在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop
举例
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
}
int main()
{
//主线程
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
//构造一个IO线程对象,此时还未启动
EventLoopThread loopThread;
//此时才是创建一个IO线程
EventLoop* loop = loopThread.startLoop();
// 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
//loop对象所在的线程与主线程是不一样的
//不在loop所在的线程中调用runInLoop,而是在主线程中调用
//异步调用会把回调函数放到队列当中,然后唤醒IO线程的poll,接下来就去执行这个函数
loop->runInLoop(runInThread);
sleep(1);
// runAfter内部也调用了runInLoop,所以这里也是异步调用
//loop对象所在的IO线程调用runInThread
loop->runAfter(2, runInThread);
sleep(3);
loop->quit();
printf("exit main().\n");
}