muduo采用的是one Loop per Thread 的模式,即每个线程中有一个主循环,再配合着IO多路复用,即可实现在一个线程中同时监听多个fd。主要利用EventLoop{.h/.cc} Channel{.h/.cc} Epoll{.h/.cc}来实现Reactor模式。
回顾Reactor事件处理模式
Reactor模式是了解高性能IO的最基础模式,其主要的思想是,将需要处理的IO事件注册到一个IO多路复用的fd上,让单线程/单进程来监听这个fd,如果fd上监听的事件被触发则立刻调用不同事件的处理器(回调函数)进行处理。
在c++11中,可以简单使用function/bind来实现回调关系,使得编码更加简洁。
EventLoop类具体分析
class EventLoop{
public:
EventLoop();
~EventLoop();
void loop();
void assertInLoopThread(){
if(!isInLoopThread()){
abortNotInLoopThread();
}
}
EventLoop* getEventLoopOfCurrentThread();
bool isInLoopThread() const { return threadId_ ==static_cast<pid_t>(pthread_self());}
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
void quit();
private:
typedef std::vector<Channel*> ChannelList;
void abortNotInLoopThread();
bool loop_;
const pid_t threadId_;
std::unique_ptr<Epoll> poller_;
bool quit_;
ChannelList activeChannels_;
};
EventLoop中最重要的loop函数,里面有一个循环调用poller->poll(),每次都将activesChannels_传入poll函数来获得在本次循环中触发事件的channel,然后更具事件的不同种类(新连接,可读,可写)来分别调用不同的触发器。
void EventLoop::loop(){
...
while (!quit_)//one loop per thread指的就是这个loop
{
activeChannels_.clear();
//pollreturntime = poller_->poll(kPollTimes,&activeChannels_);//这里调用poll看有无发生事件,如果有则放到了activechannels_
poller_->poll(kPollTimes,&activeChannels_);//先不用Timestamp
for(ChannelList::iterator it=activeChannels_.begin();it!=activeChannels_.end();++it){
(*it)->handleEvent();//进行处理事件
}
}
...
}
Channel类具体分析
Channel类其实是fd的一个抽象,即每个Channel对应一个fd及对其操作。
class Channel : noncopyable{
public:
typedef function<void()> EventCallback;
Channel(EventLoop* loop,int fd);
void handleEvent(); //根据revents的值调用不同的用户调用
void setReadCallback(const EventCallback& cb){
readcallback_=cb;
}
void setWriteCallback(const EventCallback& cb){
writecallback_=cb;
}
void setErrorCallback(const EventCallback& cb){
errorcallback_=cb;
}
int fd() const{ return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_=revt; }
bool isNoneEvent() const { return events_ == kNoneEvent; }
void enableReading() {events_ |= kReadEvent; update(); }
void enableWriting() {events_ |= kWriteEvent; update(); }
void disableWriting() {events_ &= ~kWriteEvent; update(); }
void diableAll() {events_ = kNoneEvent; update(); }
int index() const { return index_; }
void set_index(int index) { index_ = index; }
EventLoop* ownerLoop() const { return loop_; }
private:
void update();//用于更改完events后的更新操作
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_;//每个channel对应一个EventLoop
const int fd_;//每个channel对应一个fd
int events_; //fd关心的io事件
int revents_;//目前活动的事件
int index_; //kAdded(已经监听),kNew(新建),kDeleted(已经被删除),用于体现当前fd的状态
EventCallback readcallback_;
EventCallback writecallback_;
EventCallback errorcallback_;
};
当事件分发器分发好不同的触发事件种类后,就要调用Channel::handleEvent()来处理。readcallback_,writecallback_,errorcallback_都是之前通过setReadCallback(),setWriteCallback(),setErrorCallback()注册过的,这里即可体现出Reactor事件处理模式的核心思想。
void Channel::handleEvent(){
if(revents_ & EPOLLERR) {
if(errorcallback_) errorcallback_();
}
if(revents_ & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)){
if(readcallback_) readcallback_();
}
if(revents_ & EPOLLOUT){
if(writecallback_) writecallback_();
}
}
Epoll类具体分析
class Epoll : noncopyable{
public:
typedef std::vector<Channel*> ChannelList;
Epoll(EventLoop* loop);
~Epoll();
void poll(int timeoutMs,ChannelList* activeChannel);//先不返回Timestamp
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
void assertInLoopThread() { ownerLoop_->assertInLoopThread(); }
private:
static const int kInitEventsize = 20;
void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;
typedef std::map<int,Channel*> ChannelMap;
EventLoop* ownerLoop_;
int epollfd_;
EventList events_;//vector<epoll_event>监听的事件
ChannelMap channels_;//map<fd,fd所对应的channel*>
};
Epoll类是对epoll处理的抽象类,正确的epoll使用方法是epoll_create -> epoll_ctl -> epoll_wait,分别在Epoll::Epoll(),Epoll::update(),Epoll::poll()中进行处理,这里对于epoll的原理不再概述。
参考:muduo源码,Linux多线程服务器编程