epoll
之前我们已经分析过select和poll这两种I/O系统调用了,现在我们开始最后一种:epoll。
内核事件表
epoll是Linux特有的I/O复用函数。 它在实现和使用上与select、 poll有很大差异。首先,epoll使用一组函数来完成任务,而不是单个函数。其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集 (这是重大区别)。但 epoll 需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。
epoll_create 函数
这个额外的文件描述符使用如下 epoll_create 函数来创建:
#include <sys/epoll.h>
int epoll_create(int size);
上面函数的返回值 就是这个文件描述符,它指向了这个内核中 内核创建的这个相对应的 事件表。
注:size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
epoll_ctl函数
而具体操作epoll的这个内核事件表是由下面的这个epoll_ctl函数具体实现:(在内核中操作)
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//其第一个参数就是 上面的那个返回值
- op参数 指定操作类型,操作类型有如下3种:
- EPOLL_CTL_ADD,往事件表中注册fd上的事件
- EPOLL_CTL_MOD,修改fd上的注册事件
- EPOLL_CTL_DEL,删除fd上的注册事件
- fd参数 是要操作的文件描述符
- event参数 指定事件,它是 epoll_event 结构指针类型。epoll_event的定义如下:
struct epoll_event
{
__uint32_t events; // Epoll事件
epoll_data_t data; // 用户数据
};
如上定义:
- 其中events成员描述事件类型。epoll支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前加上“E”,比如epoll的数据可读事件是EPOLLIN。但epoll有两个额外的事件类型——EPOLLET (epoll高效的处理模式)和 EPOLLONESHOT(防止事件被多次触发)。它们对于epoll的高效运作非常关键。 (这两个非常重要,下面会进行介绍)
- data成员用于存储用户数据,其类型 epoll_data_t 的定义如下:
typedef union epoll_data
{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
对于上面的这个联合体类型:epoll_data_t是一个联合体,其4个成员中使用最多的是fd,它指定事件所从属的目标文件描述符 (这个非常关键)。 ptr成员可用来指定与fd相关的用户数据。但由于 epoll_data_t 是一个联合体,我们不能同时使用其 ptr 成员和 fd 成员。因此,如果要将文件描述符和用户数据关联起来,以实现快速的数据访问,只能使用其他手段,比如放弃使用epoll_data_t 的 fd成员,而在ptr指向的用户数据中包含fd。
- 返回值:epoll_ctl 函数成功时返回0,失败则返回 -1 并设置 errno。
epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
//其第一个参数就是 最上面的那个返回值
- events参数:epoll_wait 函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的参数events指向的数组中 (这个发生在内核态到用户态的拷贝:只返回所有就绪的文描),这个数组只用于输出epoll_wait检测到的就绪事件。而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件,这就极大地提高了应用程序索引就绪文件描述符的效率(O(1))。
- maxevents参数指定最多监听多少个事件,它必须大于0。
- timeout参数的含义与poll接口的timeout参数相同。
- 返回值:该函数成功时返回就绪的文件描述符的个数,失败时返回 -1 并设置ermo.
基于epoll实现的服务器 连接多客户端
源代码如下:
![](/qrcode.jpg)
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#define MAX 100
int create_socket()
{
//创建监听套接字(socket描述符),指定协议族ipv4,字节流服务传输
int sockfd=socket(AF_INET,SOCK_STREAM,0);
if(sockfd == -1)
{
return -1;
}
//socket专用地址信息设置
struct sockaddr_in saddr;
memset(&saddr,0,sizeof(saddr));
saddr.sin_family=AF_INET;
saddr.sin_port=htons(6000);
saddr.sin_addr.s_addr=inet_addr("127.0.0.1");
//命名套接字,将socket专用地址绑定到socket描述符上
int res=bind(sockfd,(struct sockaddr*)&saddr,sizeof(saddr));
if(res==-1)
{
return -1;
}
if(listen(sockfd,5)==-1)//创建监听队列
{
return -1;
}
return sockfd;
}
//向内核事件表epfd 中添加 新事件的文件描述符fd
void epoll_add(int epfd,int fd)
{
struct epoll_event ev;
//设置epoll_event的结构成员
ev.data.fd=fd;
ev.events=EPOLLIN;
//EPOLL_CTL_ADD添加新事件及描述符到内核事件表
if(epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev)== -1)
{
perror("epoll ctl add error");
}
}
//从内核事件表中移除fd
void epoll_del(int epfd,int fd)
{
if(epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL)== -1)
{
perror("epoll ctl del error");
}
}
int main()
{
int sockfd=create_socket();
assert(sockfd != -1);
int epfd=epoll_create(MAX);//创建内核事件表
assert(epfd != -1);
epoll_add(epfd,sockfd);//把sockfd添加到内核事件表
struct epoll_event evs[10];//接收内核事件表返回的文描
while(1)
{
//epoll_wait返回的是前n个已经全就绪,那么我们不用全部遍历,只遍历前n个就可以
//返回的时候,就是有数据的文描 超时时间为5秒
int n=epoll_wait(epfd,evs,10,5000);
if(n== -1)
{
perror("epoll wait error");
continue;
}
else if(n==0)
{
printf("time out\n");
continue;
}
else//只遍历前n个,因为内核已告诉我们前n个有就绪事件
{
int i=0;
for(;i<n;++i)//evs 为内核为我们返回的就绪事件
{
if(evs[i].events & EPOLLIN)
{
if(evs[i].data.fd == sockfd)
{
struct sockaddr_in caddr;
int len=sizeof(caddr);
//接收一个套接字已建立的连接,得到连接套接字c
int c=accept(sockfd,
(struct sockaddr*)&caddr,&len);
if(c<0)
continue;
printf("accept c=%d\n",c);
epoll_add(epfd,c);//将c添加到内核事件表
}
else
{
char buff[128]={0};
//recv用来接收客户端数据
int num=recv(evs[i].data.fd,buff,5,0);
if(num<=0)
{
printf("one client close\n");
/*
注意:这里不能先close,应该先调用epoll_del,
因为先调用close关闭了文件描述符后,再调用epoll_del,
内核将不能找到所要从内核事件表中移除的文件描述符
*/
epoll_del(epfd,evs[i].data.fd);
close(evs[i].data.fd);
continue;
}
printf("read(%d)=%s\n",evs[i].data.fd,buff);
send(evs[i].data.fd,"ok",2,0);
}
}
//if(evs[i].events & EPOLLOUT)
}
}
}
}
效果展示如下:
注:client的代码,见昨日的博客。
LT 和 ET模式
两种模式简介
epoll 对文件描述符的操作有两种模式:LT (Level Trigger,电平触发)模式 和 ET (Edge Trigger,边沿触发)模式。
LT模式是默认的工作模式,这种模式下epoll 相当于一个效率较高的poll。
当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时(这里上面也说过了:epoll高效的处理模式),epoll将以ET模式来操作该文件描述符,ET模式是epoll的高效工作模式。
对于采用LT工作模式的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件,这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。 (这是两者的重要差别)
而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait用将不再向应用程序通知这一事件。可见,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高(同样这也就是Linux认为后者比前者的高效所在)。
启用EPOLLET,实现ET模式
LT模式是默认的工作模式,对上述的服务器程序 稍作修改即可:
且每次只读一个字符:
首先看一下,运行结果:
我们先分析一下结果:
我在client上共进行了4次的输入(事件触发),而服务器这边 每次有事件来到,仅仅是从缓冲区里面拿出来一个字符。(也就是说 一次事件触发,对应这里的一个字符输出)。也就是说:数据每次只能从缓冲区中拿到一个字符(一个字节),即我不发送一串数据,服务端只能拿到一个字符,剩下的都在其缓冲区中,且只打印了一个字符后,epoll_wait便阻塞了,这就是ET工作模式的特点,即数据就绪只提醒一次,以后不再提醒。
原因很简单:这就是普通的阻塞IO的缺点,没有事件发生epoll_wait阻塞了,这就是ET工作模式的特点,即数据就绪只提醒一次,以后不再提醒。
所以在这种情况下,就不可以使用这种普通的阻塞IO,而是使用非阻塞IO:只要当epoll_wait反馈给我们文件就绪(事件发生)后,由于我们并不知道缓冲区有多少数据,所以我这里进行循环读取,直到缓冲区没有数据为止。
使用非阻塞IO的高效ET模式 实现的服务器 连接多客户端
这里对上面阻塞I/O的程序,修改就两处:
- 将文件描述符设置成非阻塞的
- 拿到一个就绪的文件描述符后,循环读取
代码如下:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#define MAX 100
int create_socket();
void setnonblock(int fd)//将文件描述符设置成非阻塞的
{
int old_fcntl = fcntl(fd,F_GETFL);
int new_fcntl = old_fcntl | O_NONBLOCK;
if ( fcntl(fd,F_SETFL,new_fcntl) == -1 )
{
perror("fcntl error");
}
}
void epoll_add(int epfd,int fd)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLET;
if ( epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev) == -1 )
{
perror("epoll_ctl add error");
}
setnonblock(fd);
}
void epoll_del(int epfd, int fd)
{
if ( epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL) == -1 )
{
perror("epoll_ctll del error");
}
}
int main()
{
int sockfd = create_socket();
assert( sockfd != -1 );
//创建内核事件表
int epfd = epoll_create(MAX);
assert( epfd != -1 );
epoll_add(epfd,sockfd);
struct epoll_event evs[10];
while( 1 )
{
int n = epoll_wait(epfd,evs,10,5000);
if ( n == -1 )
{
perror("epoll wait error\n");
continue;
}
else if ( n == 0 )
{
printf("time out\n");
continue;
}
else
{
int i = 0;
for( ;i < n; i++)
{
if ( evs[i].events & EPOLLIN )
{
if ( evs[i].data.fd == sockfd )
{
struct sockaddr_in caddr;
int len = sizeof(caddr);
int c = accept(sockfd,
(struct sockaddr*)&caddr,&len);
if ( c < 0 )
{
continue;
}
printf("accept c =%d\n",c);
epoll_add(epfd,c);//向内核事件表中添加连接描述副
}
else//拿到一个就绪的文件描述符后,循环读取
{
while( 1 )
{
char buff[128] = {0};
int num = recv(evs[i].data.fd,buff,1,0);
if ( num == -1 )
{
break;
}
else if ( num == 0 )
{
epoll_del(epfd,evs[i].data.fd);
close(evs[i].data.fd);
printf("one client over\n");
break;
}
else
{
printf("buff=%s\n",buff);
}
}
send(evs[i].data.fd,"ok",2,0);
}
}
//if ( evs[i].events & EPOLLOUT)
}
}
}
}
int create_socket()
{
int sockfd = socket(AF_INET,SOCK_STREAM,0);
if ( sockfd == -1 )
{
return -1;
}
struct sockaddr_in saddr;
memset(&saddr,0,sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(6000);
saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = bind(sockfd,(struct sockaddr*)&saddr,sizeof(saddr));
if ( res == -1 )
{
return -1;
}
if( listen(sockfd,5) == -1 )
{
return -1;
}
return sockfd;
}
展示如下:
由上图所示:在非阻塞I/O下,epoll_wait虽然只通知一次,但数据全部被一次性读取到了。