Reactor反应堆设计模式是高效的I/O设计中常用的模式之一,它是以同步I/O方式来处理I/O事件的。目前常见的开源网络事件库libevent、libev、libuv、muduo都用到了Reactor反应堆设计模式。在服务器开发中,也有不少人基于Reactor反应堆设计模式来封装自己的网络库。下面是用C++实现Reator模式实现的回射服务器
EventHandler.h
#ifndef EVENT_HANDLER_H_
#define EVENT_HANDLER_H_
typedef int Handler;
class EventHandler
{
public:
EventHandler() {}
virtual ~EventHandler() {}
virtual Handler getHandler() = 0;
virtual void handleRead() = 0;
virtual void handleWirte() = 0;
virtual void handleError() = 0;
};
#endif // !EVENT_HANDLER_H_
Poller.h
#ifndef POLLER_H_H_
#define POLLER_H_H_
#include <map>
#include "EventHandler.h"
class Poller
{
public:
Poller() {}
virtual ~Poller() {}
virtual int waitEvent(std::map<Handler, EventHandler*>& handlers, int timeout = 0) = 0;
virtual int regist(Handler handler) = 0;
virtual int remove(Handler handler) = 0;
private:
};
#endif // !POLLER_H_H_
Dispatcher.h
#ifndef DISPATHER_H_H_
#define DISPATHER_H_H_
#include "EventHandler.h"
#include "Poller.h"
#include <map>
class Dispatcher
{
public:
static Dispatcher* getInstance() { return instance; }
int registHandler(EventHandler* handler);
void removeHander(EventHandler* handler);
void dispatchEvent(int timeout = 0);
private:
//单例模式
Dispatcher();
~Dispatcher();
//只声明不实现,用于禁止拷贝构造和赋值构造
Dispatcher(const Dispatcher&);
Dispatcher& operator=(const Dispatcher&);
private:
Poller* _poller;
std::map<Handler, EventHandler*> _handlers;
private:
static Dispatcher* instance;
};
#endif // !DISPATHER_H_H_
Dispatcher.cpp
#include "Dispatcher.h"
#include "EpollPoller.h"
#include <assert.h>
Dispatcher* Dispatcher::instance = new Dispatcher();
Dispatcher::Dispatcher()
{
_poller = new (std::nothrow)epollPoller();
assert(NULL != _poller);
}
Dispatcher::~Dispatcher()
{
std::map<Handler, EventHandler*>::iterator iter = _handlers.begin();
for (; iter != _handlers.end(); ++iter) {
delete iter->second;
}
if (_poller)
delete _poller;
}
int Dispatcher::registHandler(EventHandler* handler)
{
Handler h = handler->getHandler();
if (_handlers.end() == _handlers.find(h)){
_handlers.insert(std::make_pair(h, handler));
}
return _poller->regist(h);
}
void Dispatcher::removeHander(EventHandler* handler)
{
Handler h = handler->getHandler();
std::map<Handler, EventHandler*>::iterator iter = _handlers.find(h);
if (iter != _handlers.end())
{
delete iter->second;
_handlers.erase(iter);
}
_poller->remove(h);
}
void Dispatcher::dispatchEvent(int timeout)
{
_poller->waitEvent(_handlers, timeout);
}
ListenHandler.h
#ifndef LISTEN_HANDLER_H_
#define LISTEN_HANDLER_H_
#include "EventHandler.h"
class ListenHandler : public EventHandler
{
public:
ListenHandler(Handler fd);
~ListenHandler();
Handler getHandler() { return listenFd; }
void handleRead();
void handleWirte();
void handleError();
private:
Handler listenFd;
};
#endif // !LISTEN_HANDLER_H_
ListenHandler.cpp
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <assert.h>
#include <iostream>
#include "Dispatcher.h"
#include "ClientHandler.h"
#include "ListenHandler.h"
ListenHandler::ListenHandler(Handler fd) : listenFd(fd)
{
}
ListenHandler::~ListenHandler()
{
close(listenFd);
}
void ListenHandler::handleRead()
{
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(sockaddr_in);
int connFd = ::accept(listenFd, (sockaddr*)&peeraddr, &peerlen);
if (-1 == connFd)
{
return;
}
std::cout << "ip=" << inet_ntoa(peeraddr.sin_addr) <<
" port=" << ntohs(peeraddr.sin_port) << " fd:" << connFd << std::endl;
EventHandler* h = new(std::nothrow) ClientHandler(connFd);
assert(NULL != h);
Dispatcher::getInstance()->registHandler(h);
}
void ListenHandler::handleWirte()
{
//nothing todo
}
void ListenHandler::handleError()
{
//nothing todo
}
ClientHandler.h
#ifndef CLIENT_HANDLER_H_
#define CLIENT_HANDLER_H_
#include "EventHandler.h"
class ClientHandler : public EventHandler
{
public:
ClientHandler(Handler fd);
~ClientHandler();
Handler getHandler() { return clientFd; }
virtual void handleRead();
virtual void handleWirte();
virtual void handleError();
private:
Handler clientFd;
char revBuff[1024];
};
#endif // !CLIENT_HANDLER_H_
ClientHandler.cpp
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <assert.h>
#include <iostream>
#include<string.h>
#include "ClientHandler.h"
ClientHandler::ClientHandler(Handler fd) : clientFd(fd)
{
memset(revBuff, 0, sizeof(revBuff));
}
ClientHandler::~ClientHandler()
{
close(clientFd);
}
void ClientHandler::handleRead()
{
if (read(clientFd, revBuff, sizeof(revBuff)))
{
std::cout <<"recv client:"<<clientFd<<":"<< revBuff << std::endl;
write(clientFd, revBuff, strlen(revBuff));
memset(revBuff, 0, sizeof(revBuff));
}
else
{
std::cout << "client close fd:" << clientFd << std::endl;
close(clientFd);
}
}
void ClientHandler::handleWirte()
{
//nothing todo
}
void ClientHandler::handleError()
{
//nothing todo
std::cout << "client close:" << clientFd << std::endl;
}
EpollPoller.h
#ifndef EPOLL_POLLER_H_
#define EPOLL_POLLER_H_
#include <unistd.h>
#include <sys/epoll.h>
#include <vector>
#include "Poller.h"
class epollPoller : public Poller
{
public:
epollPoller();
~epollPoller();
int waitEvent(std::map<Handler, EventHandler*>& handlers, int timeout = 0);
int regist(Handler handler);
int remove(Handler handler);
private:
typedef std::vector<struct epoll_event> EventList;
EventList _events;
int epollFd;
};
#endif // !EPOLL_POLLER_H_
EpollPoller.cpp
#include <iostream>
#include <errno.h>
#include <assert.h>
#include "EpollPoller.h"
typedef std::vector<struct epoll_event> EventList;
epollPoller::epollPoller() :
_events(16),
epollFd(::epoll_create(100000))
{
}
epollPoller::~epollPoller()
{
close(epollFd);
}
int epollPoller::waitEvent(std::map<Handler, EventHandler*>& handlers, int timeout)
{
std::cout << "start wait"<< std::endl;
int nready = epoll_wait(epollFd, &_events[0], static_cast<int>(_events.size()), -1);
if (-1 == nready)
{
std::cout << "WARNING: epoll_wait error " << errno << std::endl;
return -1;
}
for (int i = 0; i < nready; i++)
{
Handler handle = _events[i].data.fd;
if ((EPOLLHUP | EPOLLERR) & _events[i].events) {
assert(handlers[handle] != NULL);
(handlers[handle])->handleError();
}
else
{
if ((EPOLLIN)& _events[i].events) {
assert(handlers[handle] != NULL);
(handlers[handle])->handleRead();
}
if (EPOLLOUT & _events[i].events) {
(handlers[handle])->handleWirte();
}
}
}
if (static_cast<size_t>(nready) == _events.size())
{
_events.resize(_events.size() * 2);
}
return nready;
}
int epollPoller::regist(Handler handler)
{
struct epoll_event ev;
ev.data.fd = handler;
ev.events |= EPOLLIN;
ev.events |= EPOLLET;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, handler, &ev) != 0) {
if (errno == ENOENT) {
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, handler, &ev) != 0) {
std::cout << "epoll_ctl add error " << errno << std::endl;
return -1;
}
}
}
return 0;
}
int epollPoller::remove(Handler handler)
{
struct epoll_event ev;
if (epoll_ctl(epollFd, EPOLL_CTL_DEL, handler, &ev) != 0) {
std::cout << "epoll_ctl del error" << errno << std::endl;
return -1;
}
return 0;
}
main.cpp
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <iostream>
#include <errno.h>
#include "Dispatcher.h"
#include "ListenHandler.h"
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main() {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
ERR_EXIT("socket error");
int fd_flags = ::fcntl(listenfd, F_GETFL, 0);
fd_flags |= FD_CLOEXEC;
fd_flags |= O_NONBLOCK;
fcntl(listenfd, F_SETFD, fd_flags);
struct sockaddr_in seraddr;
seraddr.sin_family = AF_INET;
seraddr.sin_port = htons(5188);
seraddr.sin_addr.s_addr = htonl(INADDR_ANY);
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, 5) < 0)
ERR_EXIT("listen");
EventHandler* handler = new ListenHandler(listenfd);
Dispatcher::getInstance()->registHandler(handler);
while (true) {
Dispatcher::getInstance()->dispatchEvent(1000);
}
return 0;
}
这个程序只是一个简单例子,同步处理I/O,效率不是很高。如果实际开发中,还有很多可以完善和优化的部分,比如自己实现I/O的接受和发送缓存区,接收到的数据首先放到接收缓冲区进行解析来处理粘包等问题,对I/O事件发送丢到线程池处理读和写的异步处理方式。比如文件描述符是一个递增的整数,可以用数组存储,下标为文件描述符,这样能提高检索效率。一般服务器会有一个事件线程,从I/O中接收的数据,封装成事件投递到事件线程去处理。为了充分利用多核引入多线程或者多进程,比如Nginx就是多线程方式实现的I/O模式,memcached是多线程的I/O模式。