一:对比select发现epoll的有点
要比较epoll相比较select高效在什么地方,就需要比较二者做相同事情的方法。
要完成对I/O流的复用需要完成如下几个事情:
1.用户态怎么将文件句柄传递到内核态?
2.内核态怎么判断I/O流可读可写?
3.内核怎么通知监控者有I/O流可读可写?
4.监控者如何找到可读可写的I/O流并传递给用户态应用程序?
5.继续循环时监控者怎样重复上述步骤?
搞清楚上述的步骤也就能解开epoll高效的原因了。
select的做法:
步骤1的解法:select创建3个文件描述符集,并将这些文件描述符拷贝到内核中,这里限制了文件句柄的最大的数量为1024(注意是全部传入—第一次拷贝);
步骤2的解法:内核针对读缓冲区和写缓冲区来判断是否可读可写,这个动作和select无关;
步骤3的解法:内核在检测到文件句柄可读/可写时就产生中断通知监控者select,select被内核触发之后,就返回可读可写的文件句柄的总数;
步骤4的解法:select会将之前传递给内核的文件句柄再次从内核传到用户态(第2次拷贝),select返回给用户态的只是可读可写的文件句柄总数,但却并不知道是哪些文件句柄需要IO操作,我们只能使用FD_ISSET宏函数来轮询所有文件句柄来找出它们;
步骤5的解法:select对于事件的监控是建立在内核的修改之上的,也就是说经过一次监控之后,内核会修改位,因此再次监控时需要再次从用户态向内核态进行拷贝(第N次拷贝);
epoll的做法:
步骤1的解法:首先执行epoll_create在内核专属于epoll的高速cache区,并在该缓冲区建立红黑树和就绪链表,用户态传入的文件句柄将被放到红黑树中(第一次拷贝)。
步骤2的解法:内核针对读缓冲区和写缓冲区来判断是否可读可写,这个动作与epoll无关;
步骤3的解法:epoll_ctl执行add动作时除了将文件句柄放到红黑树上之外,还向内核注册了该文件句柄的回调函数,内核在检测到某句柄可读可写时则调用该回调函数,回调函数将文件句柄放到就绪链表。
步骤4的解法:epoll_wait只监控就绪链表就可以,如果就绪链表有文件句柄,则表示该文件句柄可读可写,并返回到用户态(少量的拷贝);
步骤5的解法:由于内核不修改文件句柄的位,因此只需要在第一次传入就可以重复监控,直到使用epoll_ctl删除,否则不需要重新传入,因此无多次拷贝。
简单说:epoll是继承了select/poll的I/O复用的思想,并在二者的基础上从监控IO流、查找I/O事件等角度来提高效率,具体地说就是内核句柄列表、红黑树、就绪list链表来实现的。
二:理解水平触发和边缘触发
当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表, 最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了,所以,非ET的句柄,只要它上面还有事件,epoll_wait每次都会返回。而ET模式的句柄,除非有新中断到,即使socket上的事件没有处理完,也是不会次次从epoll_wait返回的。
====>区别就在于epoll_wait将socket返回到用户态时是否情况就绪链表。
二:epoll相关函数
头文件#include <sys/epoll.h>
epoll_create
int epoll_create(int size); //在早期版本中使用链表实现,size标识表的大小,现在用epoll_create1函数代替;
int epoll_create1(int flags);
epoll_create1函数
说明:创建一个epoll的句柄;
@int :调用成功时返回一个epoll句柄描述符,失败时返回-1
@flags:可取0或者EPOLL_CLOEXEC
0:如果这个参数是0,这个函数等价于poll_create(0);
EPOLL_CLOEXEC:这是这个参数唯一的有效值,如果这个参数设置为这个。那么当进程替换映像的时候会关闭这个文件描述符,
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl函数
说明:创建一个epoll的句柄;
@int :调用成功时返回0,失败时返回-1errno被设置
@epfd:epoll_create1的返回的文件句柄
@op:操作,EPOLL_CTL_ADD或者EPOLL_CTL_MOD或者EPOLL_CTL_DEL
EPOLL_CTL_ADD:向多路复用实例加入一个连接socket的文件描述符
EPOLL_CTL_MOD:改变多路复用实例中的一个socket的文件描述符的触发事件
EPOLL_CTL_DEL:移除多路复用实例中的一个socket的文件描述符
@fd:要操作的socket的文件描述符
@event:类型为struct epoll_event,设置对象socket的预期和返回结果事件,
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是下列命令的任意按位与
EPOLLIN: 对应的文件描述有可以读取的内容
EPOLLOUT:对应的文件描述符有可以写入
EPOLLRDHUP:写到一半的时候连接断开
EPOLLPRI:发生异常情况,比如所tcp连接中收到了带外消息
EPOLLET: 设置多路复用实例的文件描述符的事件触发机制为边沿触发,默认为水平触发
epoll_wait
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
epoll_wait函数
说明:等待一个epoll队列中的文件描述符的I/O事件发生;
@int :调用成功时返回>=0表示准备就绪的文件描述符个数,-1表示出错errno被设置
@epfd:epoll_create1的返回的文件句柄
@events:用于放置epoll队列中准备就绪(被触发)的事件
@maxevents:@events链表的大小
@timeout:指定函数等待的时间,这个函数阻塞这么长一段时间之后接触阻塞。
三:小例子
//利用epoll实现反射服务器
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <vector>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <algorithm>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
#define MAXLINE 1024
#define SERV_PORT 50000
#define CLIENT_NUM 1000
int main()
{
int i, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
//1.创建套接字
if ((listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket error"); //调用上边的宏
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
//2.设置套接字属性
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt error");
//3.绑定
if (bind(listenfd, (struct sockaddr*)&servaddr,sizeof(servaddr)) < 0)
ERR_EXIT("bind error");
//4.监听
if (listen(listenfd, SOMAXCONN) < 0) //listen应在socket和bind之后,而在accept之前
ERR_EXIT("listen error");
std::vector<int> clients;
int epollfd = epoll_create1(EPOLL_CLOEXEC);
if (epollfd < 0)
ERR_EXIT("epoll_create1 error");
struct epoll_event event;
event.data.fd = listenfd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0)
ERR_EXIT("epoll_ctl error");
std::vector<struct epoll_event> events(16);
struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn;
int count = 0;
char recvbuf[1024] = {0};
while (1)
{
nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("epoll_wait error");
}
else if (nready == 0)
continue;
else if ((size_t)nready == events.size())
{
events.resize(events.size()*2);
}
for (int i = 0; i<nready; i++)
{
if (events[i].data.fd == listenfd)
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept error");
printf("ip=%s, port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
printf("count=%d\n", ++count);
clients.push_back(conn);
int nb;
if (ioctl(conn, FIONBIO, &nb) == -1)
ERR_EXIT("ioctl error");
event.data.fd = conn;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event);
}
else if (events[i].events & EPOLLIN)
{
conn = events[i].data.fd;
if (conn < 0)
continue;
int ret = read(conn, recvbuf, 1024);
if (ret < 0)
ERR_EXIT("read error");
else if (ret == 0)
{
printf("client close\n");
close(conn);
event = events[i];
epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &event);
clients.erase(std::remove(clients.begin(), clients.end(), conn), clients.end());
}
fputs(recvbuf, stdout);
write(conn, recvbuf, strlen(recvbuf));
memset(recvbuf, 0, sizeof(recvbuf));
}
}
}
return 0;
}
//客户端程序
//用于测试服务端支持的最大数量:1019;
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<poll.h>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
#define MAXLINE 1024
#define SERV_PORT 50000
#define CLIENT_NUM 1000
int main(void)
{
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int clients[CLIENT_NUM];
struct pollfd p_fds[CLIENT_NUM+1];
for (int i=0; i<CLIENT_NUM ;i++)
{
int ret = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
connect(ret, (struct sockaddr *)&servaddr, sizeof(servaddr));
clients[i] = ret;
p_fds[i+1].fd = ret;
p_fds[i+1].events = POLLIN;
p_fds[i+1].revents = 0;
}
int fd_stdin = fileno(stdin);
p_fds[0].fd = STDIN_FILENO;
p_fds[0].events = POLLIN;
p_fds[0].revents = 0;
int nready;
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while (true)
{
nready = poll(p_fds, CLIENT_NUM, -1);
if(nready < 0)
ERR_EXIT("poll");
else if(nready == 0)
{
printf("timeout\n");
continue;
}
else
{
if (p_fds[0].revents&POLLIN)
{
if (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
break;
for (int i=0; i<CLIENT_NUM ;i++)
{
write(clients[i], sendbuf, strlen(sendbuf));
}
memset(sendbuf, 0, sizeof(sendbuf));
}
for (int i=1; i<=CLIENT_NUM ;i++)
{
if (p_fds[i].revents&POLLIN)
{
int ret = read(p_fds[i].fd, recvbuf, sizeof(recvbuf));
if (ret == -1)
ERR_EXIT("read error");
else if (ret == 0 || errno == ECONNRESET) //服务器关闭
{
p_fds[i].fd = -1;
break;
}
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
}
}
}
}
for (int i=0; i<CLIENT_NUM ;i++)
{
close(clients[i]);
}
return 0;
}