muduo库中的A线程通知B线程

线程间的等到通知

pipe
socketpair
eventfd
条件变量

前三个都有文件描述符,都可以方便的利用I/O复用来管理,而条件变量没有

mduo库的线程唤醒利用的eventfd,eventfd 是一个比 pipe 更高效的线程间事件通知机制,一方面它比 pipe 少用一个 file descripor,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。

  //用于eventfd,用于保存eventfd创建的文件描述符
  int wakeupFd_;
  // unlike in TimerQueue, which is an internal class,
  // we don't expose Channel to client.
  //wakeupFd_所对应的通道,该通道将会纳入poller_来管理
  boost::scoped_ptr<Channel> wakeupChannel_;//EventLoop只负责wakeupChannel_的生存期,还有其它的channel
  //<Functor是回调的任务
  std::vector<Functor> pendingFunctors_; 

相关的一些重要的函数

//一个线程唤醒另一个线程
void EventLoop::wakeup()
{
  uint64_t one = 1;
  //往wakeupFd_中写入8个字节就可以唤醒一个等待的线程
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}
//在某个线程中执行某个回调函数,该函数可以跨线程使用
void EventLoop::runInLoop(Functor&& cb)
{
  // 如果是当前IO线程调用runInLoop,则同步调用cb
  if (isInLoopThread())
  {
    cb();
  }
  // 如果是其它线程调用runInLoop,则异步地将cb添加到队列,以便eventLoop所对应的I/O线程来执行回调函数
  else
  {
    queueInLoop(std::move(cb));
  }
}
//将任务添加到队列中
void EventLoop::queueInLoop(Functor&& cb)
{
  {
  MutexLockGuard lock(mutex_);
  //添加到一个任务队列中
  pendingFunctors_.push_back(std::move(cb));  // emplace_back
  }
    // 调用queueInLoop的线程不是当前IO线程需要唤醒
	// 或者调用queueInLoop的线程是当前IO线程,并且此时正在调用pending functor,需要唤醒
	// 只有当前IO线程的事件回调中调用queueInLoop才不需要唤醒
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}
void EventLoop::loop()
{
	//其它线程或者当前IO线程添加的一些回调任务
	//因为IO线程的设计比较灵活,IO线程也能执行一些IO任务
	//I/O不是很繁忙的时候,IO线程就一直处于阻塞的状态,也就是不工作的状态
	//我们就可以添加一些计算任务,让它来处理
    doPendingFunctors();
}
void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  {
  MutexLockGuard lock(mutex_);
  //pendingFunctors_中的回调任务都放到了functors中
  functors.swap(pendingFunctors_);
  }
  //免得另有其它线程往pendingFunctors_中添加任务,所以处理一个确定的functors任务
  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}

这样在不用锁的情况下,也能够保证线程安全,实现线程安全的异步调用了。

下面就是结合了定时器的使用

//增加一个定时器,参数是定时器的回调函数、超时时间、间隔时间
//返回一个TimerId对象
//线程安全的,其它线程也可以调用addTimer,此时就会把&TimerQueue::addTimerInLoop这个任务
//交给loop_所对应的IO线程来处理
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
{
  //构造一个定时器对象,返回一个地址
  Timer* timer = new Timer(cb, when, interval);
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}


调用queueInLoop的线程不是当前IO线程(这里的当前线程是指EventLoop所指的线程)需要唤醒,比如A线程要往B线程添加一个任务,添加到线程B的任务队列中,要让B能够执行任务,需要唤醒B,以便让B执行,B是IO线程,它处于loop中(pool),唤醒它以便能够执行到doPendingFunctors

或者调用queueInLoop的线程是当前IO线程,并且此时正在调用pendingfunctor,需要唤醒。这种情况就是在doPendingFunctors中又调用了queueInLoop,否则的话pendingfunctor都执行完成之后,,回来的时候又调用了一个queueInLoop,这是就没有办法及时处理了。

只有IO线程的事件回调中调用queueInLoop才不需要唤醒,如果是在handleEvent中调用了queueInLoop,此时不需要唤醒,因为handleEvent完了之后就会doPendingFunctors。

有了runInLoop这个函数,我们就可以添加一些任务到IO线程,让IO线程来执行,如果调用runInLoop的线程不是当前线程,就把这个任务添加到队列中,如果是当前线程,直接调用cb。

EventLoopThread

任何一个线程,只要创建并运行了EventLoop,都称之为IO线程

IO线程不一定是主线程

muduo并发模型one loop per thread + threadpool,也就是说一个IO线程有且只有一个EventLoop,而一个程序可以有多个IO线程,这些线程可以用IO线程池来管理,而threadpool是计算线程池,主要用于执行计算任务。IO线程主要处理IO事件,也可以执行一些计算任务,调用EventLoop中的runInLoop来执行计算任务。

为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程
    EventLoopThread创建了一个线程
    在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop

举例

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void runInThread()
{
  printf("runInThread(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
}

int main()
{
  //主线程
  printf("main(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());

  //构造一个IO线程对象,此时还未启动
  EventLoopThread loopThread;
  //此时才是创建一个IO线程
  EventLoop* loop = loopThread.startLoop();
  // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
  //loop对象所在的线程与主线程是不一样的
  //不在loop所在的线程中调用runInLoop,而是在主线程中调用
  //异步调用会把回调函数放到队列当中,然后唤醒IO线程的poll,接下来就去执行这个函数
  loop->runInLoop(runInThread);
  sleep(1);
  // runAfter内部也调用了runInLoop,所以这里也是异步调用
  //loop对象所在的IO线程调用runInThread 
  loop->runAfter(2, runInThread);
  sleep(3);
  loop->quit();

  printf("exit main().\n");
}

猜你喜欢

转载自blog.csdn.net/wk_bjut_edu_cn/article/details/80884993