高性能服务器编程之I/O复用---epoll


之前我们已经分析过select和poll这两种I/O系统调用了,现在我们开始最后一种:epoll。

内核事件表

epoll是Linux特有的I/O复用函数。 它在实现和使用上与select、 poll有很大差异。首先,epoll使用一组函数来完成任务,而不是单个函数。其次,epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集 (这是重大区别)。但 epoll 需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。

epoll_create 函数

这个额外的文件描述符使用如下 epoll_create 函数来创建:

#include <sys/epoll.h>
int epoll_create(int size);

上面函数的返回值 就是这个文件描述符,它指向了这个内核中 内核创建的这个相对应的 事件表。

注:size参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。

epoll_ctl函数

而具体操作epoll的这个内核事件表是由下面的这个epoll_ctl函数具体实现:(在内核中操作)

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//其第一个参数就是 上面的那个返回值
  • op参数 指定操作类型,操作类型有如下3种:
  • EPOLL_CTL_ADD,往事件表中注册fd上的事件
  • EPOLL_CTL_MOD,修改fd上的注册事件
  • EPOLL_CTL_DEL,删除fd上的注册事件
  • fd参数 是要操作的文件描述符
  • event参数 指定事件,它是 epoll_event 结构指针类型。epoll_event的定义如下:
struct epoll_event 
{
	__uint32_t   events;      // Epoll事件 
	epoll_data_t data;        // 用户数据 
};

如上定义:

  1. 其中events成员描述事件类型。epoll支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前加上“E”,比如epoll的数据可读事件是EPOLLIN。但epoll有两个额外的事件类型——EPOLLET (epoll高效的处理模式)和 EPOLLONESHOT(防止事件被多次触发)。它们对于epoll的高效运作非常关键。 (这两个非常重要,下面会进行介绍)
  2. data成员用于存储用户数据,其类型 epoll_data_t 的定义如下:
typedef union epoll_data 
{
	void  *ptr;
	int  fd;
	__uint32_t  u32;
	__uint64_t  u64;
} epoll_data_t;

对于上面的这个联合体类型:epoll_data_t是一个联合体,其4个成员中使用最多的是fd,它指定事件所从属的目标文件描述符 (这个非常关键)。 ptr成员可用来指定与fd相关的用户数据。但由于 epoll_data_t 是一个联合体,我们不能同时使用其 ptr 成员和 fd 成员。因此,如果要将文件描述符和用户数据关联起来,以实现快速的数据访问,只能使用其他手段,比如放弃使用epoll_data_t 的 fd成员,而在ptr指向的用户数据中包含fd。

  • 返回值:epoll_ctl 函数成功时返回0,失败则返回 -1 并设置 errno。

epoll_wait函数

epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间等待一组文件描述符上的事件,其原型如下:

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,
			   int maxevents, int timeout);
//其第一个参数就是 最上面的那个返回值
  • events参数:epoll_wait 函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的参数events指向的数组中 (这个发生在内核态到用户态的拷贝:只返回所有就绪的文描),这个数组只用于输出epoll_wait检测到的就绪事件。而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件,这就极大地提高了应用程序索引就绪文件描述符的效率(O(1))。在这里插入图片描述
  • maxevents参数指定最多监听多少个事件,它必须大于0。
  • timeout参数的含义与poll接口的timeout参数相同。
  • 返回值:该函数成功时返回就绪的文件描述符的个数,失败时返回 -1 并设置ermo.

基于epoll实现的服务器 连接多客户端

源代码如下:

扫描二维码关注公众号,回复: 11256852 查看本文章
#define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>

#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>

#define MAX 100

int create_socket()
{
	//创建监听套接字(socket描述符),指定协议族ipv4,字节流服务传输
    int sockfd=socket(AF_INET,SOCK_STREAM,0);
    if(sockfd == -1)
    {
        return -1;
    }
    
	//socket专用地址信息设置
    struct sockaddr_in saddr;
    
    memset(&saddr,0,sizeof(saddr));

    saddr.sin_family=AF_INET;
    saddr.sin_port=htons(6000);
    saddr.sin_addr.s_addr=inet_addr("127.0.0.1");
	
	//命名套接字,将socket专用地址绑定到socket描述符上
    int res=bind(sockfd,(struct sockaddr*)&saddr,sizeof(saddr));
    if(res==-1)
    {
        return -1;
    }
    if(listen(sockfd,5)==-1)//创建监听队列
    {
        return -1;
    }
    return sockfd;
}
//向内核事件表epfd 中添加 新事件的文件描述符fd
void epoll_add(int epfd,int fd)
{
    struct epoll_event ev;
	//设置epoll_event的结构成员
    ev.data.fd=fd;
    ev.events=EPOLLIN;
	//EPOLL_CTL_ADD添加新事件及描述符到内核事件表
    if(epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev)== -1)
    {
        perror("epoll ctl add error");
    }
}
//从内核事件表中移除fd
void epoll_del(int epfd,int fd)
{
    if(epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL)== -1)
    {
        perror("epoll ctl del error");
    }
}
int main()
{
    int sockfd=create_socket();
    assert(sockfd != -1);
    
    int epfd=epoll_create(MAX);//创建内核事件表
    assert(epfd != -1);
    
    epoll_add(epfd,sockfd);//把sockfd添加到内核事件表

    struct epoll_event evs[10];//接收内核事件表返回的文描
    while(1)
    {
		//epoll_wait返回的是前n个已经全就绪,那么我们不用全部遍历,只遍历前n个就可以
		//返回的时候,就是有数据的文描 超时时间为5秒
        int n=epoll_wait(epfd,evs,10,5000);
        if(n== -1)
        {
            perror("epoll wait error");
            continue;
        }
        else if(n==0)
        {
            printf("time out\n");
            continue;
        }
        else//只遍历前n个,因为内核已告诉我们前n个有就绪事件
        {
            int i=0;
            for(;i<n;++i)//evs 为内核为我们返回的就绪事件
            {
                if(evs[i].events & EPOLLIN)
                {
                    if(evs[i].data.fd == sockfd)
                    {
                        struct sockaddr_in caddr;
                        int len=sizeof(caddr);
						
						//接收一个套接字已建立的连接,得到连接套接字c
                        int c=accept(sockfd,
                        			(struct sockaddr*)&caddr,&len);
                        if(c<0)
                            continue;
                        printf("accept c=%d\n",c);
                        epoll_add(epfd,c);//将c添加到内核事件表
                    }
                    else 
                    {
                        char buff[128]={0};
                        //recv用来接收客户端数据
                        int num=recv(evs[i].data.fd,buff,5,0);
                        
                        if(num<=0)
                        {
                            printf("one client close\n");
                            /* 
							注意:这里不能先close,应该先调用epoll_del,
							因为先调用close关闭了文件描述符后,再调用epoll_del,
							内核将不能找到所要从内核事件表中移除的文件描述符
							*/
                            epoll_del(epfd,evs[i].data.fd);
                            close(evs[i].data.fd);
                            continue;
                        }
                        printf("read(%d)=%s\n",evs[i].data.fd,buff);
                        send(evs[i].data.fd,"ok",2,0);
                    }
                }
                //if(evs[i].events & EPOLLOUT)
            }
        }
    }

}

效果展示如下:
在这里插入图片描述
注:client的代码,见昨日的博客。

LT 和 ET模式

两种模式简介

epoll 对文件描述符的操作有两种模式:LT (Level Trigger,电平触发)模式 和 ET (Edge Trigger,边沿触发)模式。

LT模式是默认的工作模式,这种模式下epoll 相当于一个效率较高的poll。
当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时(这里上面也说过了:epoll高效的处理模式),epoll将以ET模式来操作该文件描述符,ET模式是epoll的高效工作模式。

对于采用LT工作模式的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件,这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。 (这是两者的重要差别)

而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait用将不再向应用程序通知这一事件。可见,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高(同样这也就是Linux认为后者比前者的高效所在)。

启用EPOLLET,实现ET模式

LT模式是默认的工作模式,对上述的服务器程序 稍作修改即可:在这里插入图片描述
且每次只读一个字符:在这里插入图片描述
首先看一下,运行结果:
在这里插入图片描述
我们先分析一下结果:
我在client上共进行了4次的输入(事件触发),而服务器这边 每次有事件来到,仅仅是从缓冲区里面拿出来一个字符。(也就是说 一次事件触发,对应这里的一个字符输出)。也就是说:数据每次只能从缓冲区中拿到一个字符(一个字节),即我不发送一串数据,服务端只能拿到一个字符,剩下的都在其缓冲区中,且只打印了一个字符后,epoll_wait便阻塞了,这就是ET工作模式的特点,即数据就绪只提醒一次,以后不再提醒。

原因很简单:这就是普通的阻塞IO的缺点,没有事件发生epoll_wait阻塞了,这就是ET工作模式的特点,即数据就绪只提醒一次,以后不再提醒。
在这里插入图片描述
所以在这种情况下,就不可以使用这种普通的阻塞IO,而是使用非阻塞IO:只要当epoll_wait反馈给我们文件就绪(事件发生)后,由于我们并不知道缓冲区有多少数据,所以我这里进行循环读取,直到缓冲区没有数据为止。

使用非阻塞IO的高效ET模式 实现的服务器 连接多客户端

这里对上面阻塞I/O的程序,修改就两处:

  1. 将文件描述符设置成非阻塞的
  2. 拿到一个就绪的文件描述符后,循环读取

代码如下:

#define	_GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>

#define		MAX  	100
int create_socket();

void setnonblock(int fd)//将文件描述符设置成非阻塞的
{
	int old_fcntl = fcntl(fd,F_GETFL);
	int new_fcntl = old_fcntl | O_NONBLOCK;

	if ( fcntl(fd,F_SETFL,new_fcntl) == -1 )
	{
		perror("fcntl error");
	}
}

void epoll_add(int epfd,int fd)
{
	struct epoll_event ev;
	ev.data.fd = fd;
	ev.events = EPOLLIN | EPOLLET;
	if ( epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev)  == -1 )
	{
		perror("epoll_ctl add error");
	}

	setnonblock(fd);
}

void epoll_del(int epfd, int fd)
{
	if ( epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL) == -1 )
	{
		perror("epoll_ctll del error");
	}
}

int main()
{
	int sockfd = create_socket();
	assert( sockfd != -1 );

	//创建内核事件表
	int epfd = epoll_create(MAX);
	assert( epfd != -1 );

	epoll_add(epfd,sockfd);

	struct epoll_event evs[10];

	while( 1 )
	{
		int n = epoll_wait(epfd,evs,10,5000);
		if ( n == -1 )
		{
			perror("epoll wait error\n");
			continue;
		}
		else if ( n == 0 )
		{
			printf("time out\n");
			continue;
		}
		else
		{
			int i = 0;
			for( ;i < n; i++)
			{
				if ( evs[i].events & EPOLLIN )
				{
					if ( evs[i].data.fd == sockfd )
					{
						struct sockaddr_in caddr;
						int len = sizeof(caddr);

						int c = accept(sockfd,
									  (struct sockaddr*)&caddr,&len);
						if ( c < 0 )
						{
							continue;
						}

						printf("accept c =%d\n",c);

						epoll_add(epfd,c);//向内核事件表中添加连接描述副
					}
					else//拿到一个就绪的文件描述符后,循环读取
					{
						while( 1 )
						{
							char buff[128] = {0};
							int num = recv(evs[i].data.fd,buff,1,0);
							if ( num == -1 )
							{
								break;
							}
							else if ( num == 0 )
							{
								epoll_del(epfd,evs[i].data.fd);
								close(evs[i].data.fd);
								printf("one client over\n");
							    break;
							}
							else
							{
								printf("buff=%s\n",buff);
							}

						}
						send(evs[i].data.fd,"ok",2,0);

					}
				}

				//if ( evs[i].events & EPOLLOUT)
			}
		}
	}

}

int create_socket()
{
	int sockfd = socket(AF_INET,SOCK_STREAM,0);
	if ( sockfd == -1 )
	{
		return -1;
	}

	struct sockaddr_in saddr;
	memset(&saddr,0,sizeof(saddr));
	saddr.sin_family = AF_INET;
	saddr.sin_port = htons(6000);
	saddr.sin_addr.s_addr = inet_addr("127.0.0.1");

	int res = bind(sockfd,(struct sockaddr*)&saddr,sizeof(saddr));
	if ( res == -1 )
	{
		return -1;
	}

	if( listen(sockfd,5) == -1 )
	{
		return -1;
	}

	return sockfd;
}

展示如下:
在这里插入图片描述由上图所示:在非阻塞I/O下,epoll_wait虽然只通知一次,但数据全部被一次性读取到了。

猜你喜欢

转载自blog.csdn.net/weixin_43949535/article/details/102773245