写在前面
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,下面说下我们在编程时epoll具体的用法。
epoll 主要采用对已就绪的 fd 进行轮询操作
一、epoll 触发方式
epoll支持 ET 和 LT 两种触发方式
ET(边缘触发):Nginx 就是采用 ET 触发方式,只支持 no-block 方式,当一个 fd 缓冲区就绪的时候,只会发送一次事件触发, 而不会管缓冲区的数据是否已经被读取,都不会再发送第二次
LT(边缘触发):支持no-block 和 block 两种方式,当一个 fd 缓冲区就绪时,只要缓冲区有数据,就会不停的发送就绪通知
二、epoll 相关函数:
2.1、int epoll_create(int size);
用于创建一个 epoll 句柄,创建一个 epoll 句柄之后,会占用一个 fd 描述符,对于一个进程来说,它相关的 fd 描述符可以查看/proc/进程id/fd/, 在使用完epoll 之后,需要对他进行 close ,否则会导致 fd 太多被耗尽
2.2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
事件注册函数,将 fd 添加、修改、删除到 epfd 中,通过 op 参数修改
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第3个参数 fd是待监测的连接套接字,第4个参数是在告诉 epoll对什么样的事件感兴趣,它使用了 epoll_event结构体,在上文介绍过的 epoll实现机制中会为每一个事件创建 epitem结构体,而在epitem中有一个 epoll_event类型的 event成员。下面看一下 epoll_event的定义:
struct epoll_event{
__uint32_t events;
epoll_data_t data;
};
events的取值:
注意一下表中的某些叙述:
EPOLLIN:TCP连接的远端主动关闭连接,也是可读事件;
EPOLLOUT:主动建立连接,发起非阻塞的 TCP连接(connect也可以这么判断),连接建立成功的事件也是可写事件。
data成员是一个epoll_data联合,其定义如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
可见,这个 data成员还与具体的使用方式相关。例如,ngx_epoll_module模块只使用了联合中的 ptr成员,作为指向 ngx_connection_t连接的指针。我们在项目中一般使用的也是 ptr成员,因为它可以指向任意的结构体地址。
2.3、int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
可以理解为收集epoll 监控的所有事件中,已经发生的那部分 fd 的数量
三、epoll 工作原理
3.1、在调用 epoll_create 之后,内核就已经创建了一个 eventpoll 红黑树结构体,一个 list 双向链表,在内核态准备接受存储需要监控的 fd。
3.2、在调用 epoll_ctr 之后,直接向内核态的 eventpoll 进行 add/mod/del 对应的 fd,对于新添加进来的 fd,重复的 fd 可以通过 eventpoll 红黑树识别出来,而不需要再次从用户态拷贝到内核态这个过程
3.3、同时 epoll 还维护了一个双向的 list 链表, 在epoll_ctr执行的时候,除了会向eventpoll 红黑树添加修改外,还会在内核中断函数处理程序中注册一个回调函数,告诉内核,当这个 fd 就绪之后,将他放到 list 里面去。
3.4、在 epoll_wait 调用的时候,就是观察这个双向 list 是否有数据,有就直接处理即可
代码:
/* 这是一个使用了epoll多路转接技术的tcp服务端
* 使用epoll来打破原先单线程只能处理一个客户端的壁垒
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
void setnonblock(int fd)
{
int flag = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
int recv_data(int fd, char *buff)
{
int ret;
int alen = 0;
while(1) {
//边缘触发,为了防止缓冲区数据没有读完,因此我们进行了循环
//读取,但是循环读取存在一个问题:recv有可能会阻塞
ret = recv(fd, buff + alen, 2, 0);
if (ret <= 0) {
if (errno == EAGAIN) {
//没数据了,跳出
break;
}else if (errno == EINTR) {
//被信号打断,重新接收数据
continue;
}
return -1;
}else if (ret < 2) {
//如果接收的数据小于想要获取的长度,代表数据读完了
break;
}
alen += ret;
}
}
int main(int argc, char *argv[])
{
//实现tcp的服务端程序
int sockfd = -1, ret, i;
socklen_t len = -1;
char buff[1024] = {0};
struct sockaddr_in srv_addr;
struct sockaddr_in cli_addr;
sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockfd < 0) {
perror("socket error");
return -1;
}
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(atoi(argv[2]));
srv_addr.sin_addr.s_addr = inet_addr(argv[1]);
len = sizeof(struct sockaddr_in);
ret = bind(sockfd, (struct sockaddr*)&srv_addr, len);
if (ret < 0) {
perror("bind error");
return -1;
}
if (listen(sockfd, 5) < 0) {
perror("listen error");
return -1;
}
//在内核创建eventpoll结构
//int epoll_create(int size);
// size: 现在已经被忽略了,大于0即可
// 返回:epoll句柄(文件描述符)
int epfd = epoll_create(10);
if (epfd < 0) {
perror("epoll_create error");
return -1;
}
//向内核添加事件
//int epoll_ctl(int epfd, int op, int fd,
// struct epoll_event *event);
// epfd: epoll句柄
// op: 当前操作
// EPOLL_CTL_ADD 添加事件
// EPOLL_CTL_MOD 事件修改
// EPOLL_CTL_DEL 删除事件
// fd: 所监控的描述符
// event: 跟描述符所关联的事件
// EPOLLIN 可读事件
// EPOLLOUT 可写事件
// EPOLLET 边缘触发属性
// 每当有新的数据到来的时候,事件会触发一次,假如缓冲
// 区数据没有读完,但是不会再次提醒(就这一条数据来说
// 不会再次触发事件)
// 基于这种特性,就需要我们一次性将缓冲区数据全部读完
// EPOLLLT 水平触发(默认)
// 如何说明一个描述符是读就绪的?
// 可读事件就是缓冲区是否有数据可读
// 只要缓冲区有数据就一直提醒(每次epoll都会触发事件就
// 绪)
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
while(1) {
struct epoll_event evs[10];
//int epoll_wait(int epfd, struct epoll_event *events,
// int maxevents, int timeout);
// epfd: epoll句柄
// events:epoll_event结构体数组,用于获取就绪的描述符事件
// 这个事件就是我们之前添加时候描述符关联的事件
// maxevents: 能够获取的最大就绪事件个数(等同数组大小)
// timeout:epoll最大超时等待时间(毫秒)
// 返回:错误:-1 超时:=0 就绪个数:>0
int nfds = epoll_wait(epfd, evs, 10, 3000);
if (nfds < 0) {
perror("epoll_wait error");
return -1;
}else if (nfds == 0) {
printf("have no data arrived!!\n");
continue;
}
//对就绪的时间进行处理
for (i = 0; i < nfds; i++) {
//如果就绪的时间中对应的描述符是监听描述符
if (evs[i].data.fd == sockfd) {
int new_fd;
new_fd = accept(sockfd,
(struct sockaddr*)&cli_addr, &len);
if (new_fd < 0) {
perror("accept error");
continue;
}
//针对边缘触发模式的描述符需要设置为非阻塞
//setnonblock(new_fd);
//给新的连接,组装一个事件结构,并且假如到epoll的内
//核事件监控集合
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = new_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &ev);
}else {
char buff[1024] = {0};
ret = recv_data(evs[i].data.fd, buff);
if (ret <= 0) {
if (errno == EAGAIN||errno == EINTR) {
continue;
}
epoll_ctl(epfd, EPOLL_CTL_DEL, evs[i].data.fd,
&ev);
close(evs[i].data.fd);
}
printf("client say:%s\n", buff);
}
}
//客户端数据处理
}
close(sockfd);
return 0;
}