6.Linux网络编程-epoll原理

一:对比select发现epoll的有点
要比较epoll相比较select高效在什么地方,就需要比较二者做相同事情的方法。
要完成对I/O流的复用需要完成如下几个事情:
1.用户态怎么将文件句柄传递到内核态?
2.内核态怎么判断I/O流可读可写?
3.内核怎么通知监控者有I/O流可读可写?
4.监控者如何找到可读可写的I/O流并传递给用户态应用程序?
5.继续循环时监控者怎样重复上述步骤?
搞清楚上述的步骤也就能解开epoll高效的原因了。

select的做法:
步骤1的解法:select创建3个文件描述符集,并将这些文件描述符拷贝到内核中,这里限制了文件句柄的最大的数量为1024(注意是全部传入—第一次拷贝);
步骤2的解法:内核针对读缓冲区和写缓冲区来判断是否可读可写,这个动作和select无关;
步骤3的解法:内核在检测到文件句柄可读/可写时就产生中断通知监控者select,select被内核触发之后,就返回可读可写的文件句柄的总数;
步骤4的解法:select会将之前传递给内核的文件句柄再次从内核传到用户态(第2次拷贝),select返回给用户态的只是可读可写的文件句柄总数,但却并不知道是哪些文件句柄需要IO操作,我们只能使用FD_ISSET宏函数来轮询所有文件句柄来找出它们;
步骤5的解法:select对于事件的监控是建立在内核的修改之上的,也就是说经过一次监控之后,内核会修改位,因此再次监控时需要再次从用户态向内核态进行拷贝(第N次拷贝);

epoll的做法:
步骤1的解法:首先执行epoll_create在内核专属于epoll的高速cache区,并在该缓冲区建立红黑树和就绪链表,用户态传入的文件句柄将被放到红黑树中(第一次拷贝)。
步骤2的解法:内核针对读缓冲区和写缓冲区来判断是否可读可写,这个动作与epoll无关;
步骤3的解法:epoll_ctl执行add动作时除了将文件句柄放到红黑树上之外,还向内核注册了该文件句柄的回调函数,内核在检测到某句柄可读可写时则调用该回调函数,回调函数将文件句柄放到就绪链表。
步骤4的解法:epoll_wait只监控就绪链表就可以,如果就绪链表有文件句柄,则表示该文件句柄可读可写,并返回到用户态(少量的拷贝);
步骤5的解法:由于内核不修改文件句柄的位,因此只需要在第一次传入就可以重复监控,直到使用epoll_ctl删除,否则不需要重新传入,因此无多次拷贝。

简单说:epoll是继承了select/poll的I/O复用的思想,并在二者的基础上从监控IO流、查找I/O事件等角度来提高效率,具体地说就是内核句柄列表、红黑树、就绪list链表来实现的。

二:理解水平触发和边缘触发
当一个socket句柄上有事件时,内核会把该句柄插入上面所说的准备就绪list链表,这时我们调用epoll_wait,会把准备就绪的socket拷贝到用户态内存,然后清空准备就绪list链表, 最后,epoll_wait干了件事,就是检查这些socket,如果不是ET模式(就是LT模式的句柄了),并且这些socket上确实有未处理的事件时,又把该句柄放回到刚刚清空的准备就绪链表了,所以,非ET的句柄,只要它上面还有事件,epoll_wait每次都会返回。而ET模式的句柄,除非有新中断到,即使socket上的事件没有处理完,也是不会次次从epoll_wait返回的。

====>区别就在于epoll_wait将socket返回到用户态时是否情况就绪链表。

二:epoll相关函数
头文件#include <sys/epoll.h>

epoll_create

int epoll_create(int size);	//在早期版本中使用链表实现,size标识表的大小,现在用epoll_create1函数代替;
int epoll_create1(int flags);
epoll_create1函数
说明:创建一个epoll的句柄;
@int :调用成功时返回一个epoll句柄描述符,失败时返回-1
@flags:可取0或者EPOLL_CLOEXEC
	0:如果这个参数是0,这个函数等价于poll_create(0);
	EPOLL_CLOEXEC:这是这个参数唯一的有效值,如果这个参数设置为这个。那么当进程替换映像的时候会关闭这个文件描述符,

epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl函数
说明:创建一个epoll的句柄;
@int :调用成功时返回0,失败时返回-1errno被设置
@epfd:epoll_create1的返回的文件句柄
@op:操作,EPOLL_CTL_ADD或者EPOLL_CTL_MOD或者EPOLL_CTL_DEL
	EPOLL_CTL_ADD:向多路复用实例加入一个连接socket的文件描述符
	EPOLL_CTL_MOD:改变多路复用实例中的一个socket的文件描述符的触发事件
	EPOLL_CTL_DEL:移除多路复用实例中的一个socket的文件描述符
@fd:要操作的socket的文件描述符
@event:类型为struct epoll_event,设置对象socket的预期和返回结果事件,
typedef union epoll_data {
               void        *ptr;
               int          fd;
               uint32_t     u32;
               uint64_t     u64;
} epoll_data_t;

struct epoll_event {
               uint32_t     events;      /* Epoll events */
               epoll_data_t data;        /* User data variable */
};   
events可以是下列命令的任意按位与
EPOLLIN: 对应的文件描述有可以读取的内容
EPOLLOUT:对应的文件描述符有可以写入
EPOLLRDHUP:写到一半的时候连接断开
EPOLLPRI:发生异常情况,比如所tcp连接中收到了带外消息
EPOLLET: 设置多路复用实例的文件描述符的事件触发机制为边沿触发,默认为水平触发

epoll_wait

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
epoll_wait函数
说明:等待一个epoll队列中的文件描述符的I/O事件发生;
@int :调用成功时返回>=0表示准备就绪的文件描述符个数,-1表示出错errno被设置
@epfd:epoll_create1的返回的文件句柄
@events:用于放置epoll队列中准备就绪(被触发)的事件
@maxevents:@events链表的大小
@timeout:指定函数等待的时间,这个函数阻塞这么长一段时间之后接触阻塞。

三:小例子

//利用epoll实现反射服务器
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <vector>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <algorithm>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while (0)

#define MAXLINE 1024
#define SERV_PORT 50000
#define CLIENT_NUM 1000

int main()
{
	int i, maxi, listenfd, connfd, sockfd;
	int nready;

	ssize_t n;
	char buf[MAXLINE];
	socklen_t clilen;
	struct sockaddr_in cliaddr, servaddr;

	//1.创建套接字
	if ((listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
		ERR_EXIT("socket error"); //调用上边的宏

	memset(&servaddr, 0, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port = htons(SERV_PORT);

	//2.设置套接字属性
	int on = 1;
	if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
		ERR_EXIT("setsockopt error");

	//3.绑定
	if (bind(listenfd, (struct sockaddr*)&servaddr,sizeof(servaddr)) < 0)
		ERR_EXIT("bind error");

	//4.监听
	if (listen(listenfd, SOMAXCONN) < 0) //listen应在socket和bind之后,而在accept之前
		ERR_EXIT("listen error");

	std::vector<int> clients;
	int epollfd = epoll_create1(EPOLL_CLOEXEC);
	if (epollfd < 0)
		ERR_EXIT("epoll_create1 error");

	struct epoll_event event;
	event.data.fd = listenfd;
	event.events = EPOLLIN | EPOLLET;

	if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0)
		ERR_EXIT("epoll_ctl error");

	std::vector<struct epoll_event> events(16);
	struct sockaddr_in peeraddr;
	socklen_t peerlen;
	int conn;
	int count = 0;
	char recvbuf[1024] = {0};

	while (1)
	{
		nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1);
		if (nready == -1)
		{
			if (errno == EINTR)
				continue;
			ERR_EXIT("epoll_wait error");
		}
		else if (nready == 0)
			continue;
		else if ((size_t)nready == events.size())
		{
			events.resize(events.size()*2);
		}

		for (int i = 0; i<nready; i++)
		{
			if (events[i].data.fd == listenfd)
			{
				peerlen = sizeof(peeraddr);
				conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
				if (conn == -1)
					ERR_EXIT("accept error");
				printf("ip=%s, port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
				printf("count=%d\n", ++count);
				clients.push_back(conn);
				int nb;
				if (ioctl(conn, FIONBIO, &nb) == -1)
					ERR_EXIT("ioctl error");
				event.data.fd = conn;
				event.events = EPOLLIN | EPOLLET;
				epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &event);
			}
			else if (events[i].events & EPOLLIN)
			{
				conn = events[i].data.fd;
				if (conn < 0)
					continue;
				int ret = read(conn, recvbuf, 1024);
				if (ret < 0)
					ERR_EXIT("read error");
				else if (ret == 0)
				{
					printf("client close\n");
					close(conn);
					event = events[i];
					epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &event);
					clients.erase(std::remove(clients.begin(), clients.end(), conn), clients.end());
				}

				fputs(recvbuf, stdout);
				write(conn, recvbuf, strlen(recvbuf));
				memset(recvbuf, 0, sizeof(recvbuf));
			}
		}
	}

	return 0;
}

//客户端程序
//用于测试服务端支持的最大数量:1019;
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<string.h>
#include<poll.h>

#define ERR_EXIT(m) \
    do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while (0)

#define MAXLINE 1024
#define SERV_PORT 50000
#define CLIENT_NUM 1000

int main(void)
{
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(SERV_PORT);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    int clients[CLIENT_NUM];
    struct pollfd p_fds[CLIENT_NUM+1];
    for (int i=0; i<CLIENT_NUM ;i++)
    {
    	int ret = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    	connect(ret, (struct sockaddr *)&servaddr, sizeof(servaddr));
    	clients[i] = ret;
    	p_fds[i+1].fd = ret;
    	p_fds[i+1].events = POLLIN;
    	p_fds[i+1].revents = 0;
    }

    int fd_stdin = fileno(stdin);
	p_fds[0].fd = STDIN_FILENO;
	p_fds[0].events = POLLIN;
	p_fds[0].revents = 0;

    int nready;
    char sendbuf[1024] = {0};
    char recvbuf[1024] = {0};

    while (true)
    {
		nready = poll(p_fds, CLIENT_NUM, -1);
        if(nready < 0)
            ERR_EXIT("poll");
        else if(nready == 0)
        {
            printf("timeout\n");
            continue;
        }
        else
        {
			if (p_fds[0].revents&POLLIN)
			{
				if (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
					break;
	        	for (int i=0; i<CLIENT_NUM ;i++)
	        	{
					write(clients[i], sendbuf, strlen(sendbuf));
	        	}
	        	memset(sendbuf, 0, sizeof(sendbuf));
			}

        	for (int i=1; i<=CLIENT_NUM ;i++)
        	{
            	if (p_fds[i].revents&POLLIN)
    			{
    				int ret = read(p_fds[i].fd, recvbuf, sizeof(recvbuf));
    				if (ret == -1)
    					ERR_EXIT("read error");
    				else if (ret  == 0 || errno == ECONNRESET)   //服务器关闭
    				{
    					p_fds[i].fd = -1;
    					break;
    				}

    				fputs(recvbuf, stdout);
    				memset(recvbuf, 0, sizeof(recvbuf));
    			}
        	}
        }
    }

    for (int i=0; i<CLIENT_NUM ;i++)
    {
    	close(clients[i]);
    }
    return 0;
}
发布了32 篇原创文章 · 获赞 3 · 访问量 1404

猜你喜欢

转载自blog.csdn.net/m0_37582216/article/details/104525182