LinuxI/O多路转接

基本概念

①阻塞与非阻塞

阻塞与非阻塞讨论的是在等待调用结果时的状态

阻塞调用是指在等待时,当前线程会被挂起. 调用线程只有在得到结果之后才会返回
非阻塞调用指在等待时,该线程可以执行其他任务,不被OS挂起

②同步通信 vs 异步通信

同步与异步讨论的是调用者是否会主动等待调用结果

同步:调用者发出调用时,没有得到结果不会返回,阻塞等待,调用者主动等待该调用结果
异步:与同步相反,发出调用后立即返回,调用内的工作由别人完成,自己并不参与,等待被调用者的通知,直接使用

③非阻塞IO

文件描述符, 默认都是阻塞IO

int fcntl(int fd, int cmd, … /* arg */ )
对于cmd参数:
复制一个现有的描述符(cmd=F_DUPFD).
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)

通过fcntl,实现一个非阻塞的文件描述符

void SetNoBlock(int fd) {
    
     
	 int fl = fcntl(fd, F_GETFL); //将当前的文件描述符的属性取出来
	 if (fl < 0)
	  {
    
     
 		 perror("fcntl");
		 return; 
	  }
	 fcntl(fd, F_SETFL, fl | O_NONBLOCK);
	 //使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数
}

五种IO模型

①阻塞IO:

在内核将数据准备好之前, 系统调用会一直阻塞式等待(被OS挂起)
比如套接字:
在这里插入图片描述

②非阻塞IO

如果内核还未将数据准备好, 系统调用仍然会直接返回
并且返回EWOULDBLOCK错误码,定期检测

非阻塞IO可能会反复检测该文件描述符数据是否就绪, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用
在这里插入图片描述
对于阻塞IO,OS需要唤醒阻塞的进程,由OS发起并执行
而非阻塞轮询是由用户自己发起检测,OS执行

③信号驱动IO

内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
在这里插入图片描述

④IO多路转接:

虽然看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
在这里插入图片描述
IO分两步,一个是等待,一个是拷贝,高效IO往往指等待时间少

IO多路转接负责同时等待多个文件描述符,当数据就绪时返回,再次recvfrom后就直接拷贝数据,提高了IO效率

⑤异步IO

由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)
在这里插入图片描述
异步IO只需要发出系统调用请求,然后等待OS递交信号,此时用户缓冲区已经拷贝完成,直接取用就行,该动作由OS完成,而信号驱动由用户完成拷贝数据到用户缓冲区

特点:没有参与等待,没有参与拷贝,不会等待OS的信号再去调用recvfrom

IO多路转接

①select

select的主要工作是负责等待数据就绪,并且通知上层
特点:只要底层缓冲区有数据(有空间),select就认为读事件(写事件)就绪

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数:
nfds:最大文件描述符加1(select遍历文件描述符数组)
fd_set:文件描述符的位图,输入输出型参数,输入代表需要OS检测的文件描述符,输出为就绪的文件描述符,只能最多同时监控1024个
readfds:读事件位图
writefds:写事件位图
exceptfds:异常位图
timeout:
如果为NULL,则一直阻塞式等待,
0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回
返回值:
执行成功则返回所有就绪文件描述符的个数
0表示时间超过timeout
-1代表等待出错

作为输入输出型参数,select每次调用后,原来的参数数据就被覆盖了,所以需要通过数组保存原来的数据,每次都需要对传入的参数如readfds进行重新设置

连接事件到来,select也认为是读事件就绪

简易的select服务器:

  1 #pragma once                                                                                                               
  2 #include"sock.hpp"
  3 #define DFL_FD -1
  4 #define BACK_LOG 5
  5 #define NUM 1024
  6 namespace ns_select
  7 {
    
    
  8   class SelectServer
  9   {
    
    
 10     private:
 11       int listen_sock;
 12       unsigned short port;
 13     public:
 14     SelectServer(unsigned short _port):port(_port)
 15     {
    
    
 16         
 17     }
 18     void InitSelectServer(){
    
    
 19       listen_sock=tzc::Sock::Socket();
 20       tzc::Sock::Bind(listen_sock,port);
 21       tzc::Sock::Listen(listen_sock,BACK_LOG);
 22     }
 23     void Run()
 24     {
    
    
 25       fd_set rfds;
 26       int fd_array[NUM]={
    
    0};
 27       ClearArray(fd_array,NUM,DFL_FD);//将数组初始化为-1
 28       fd_array[0]=listen_sock;//放入监听套接字
 29       for(;;)
 30       {
    
    
 31         //重新设置时间与max_fd
 32         struct timeval timeout={
    
    5,0};
 33         int max_fd=DFL_FD;
 34         FD_ZERO(&rfds);
 35         //添加进rfds
 36         for(auto i=0;i<NUM;i++)
 37         {
    
    
 38           if(fd_array[i]==DFL_FD)
 39           {
    
    
 40             continue;
 41           }
 42           FD_SET(fd_array[i],&rfds);
 43           if(max_fd<fd_array[i])                                                                                           
 44           {
    
    
 45             max_fd=fd_array[i];
 46           }
 47         }
 48         switch(select(max_fd+1,&rfds,nullptr,nullptr,&timeout))
 49         {
    
    
 50           case 0:
 51             std::cout<<"timeout"<<std::endl;
 52             break;
 53           case -1:
 54             std::cerr<<"select error"<<std::endl;
 55             break;
 56           default:
 57             //正常事件处理
 58         HandlerEvent(rfds,fd_array,NUM);
 59             break;
 60 
 61         }
 62       }   
 63     }
 64     void HandlerEvent(const fd_set &rfds,int fd_array[],int num)
 65     {
    
    
 66       for(auto i=0;i<num;i++)
 67       {
    
                                                                                                                        
 68         if(fd_array[i]==DFL_FD)
 69         {
    
    
 70           continue;
 71         }
 72         //有效fd
 73         if(fd_array[i]==listen_sock&&FD_ISSET(fd_array[i],&rfds))
 74         {
    
    
 75           //连接事件到来
 76           struct sockaddr_in peer;
 77           socklen_t len=sizeof(peer);
 78           int sock=accept(fd_array[i],(struct sockaddr*)&peer,&len);
 79           if(sock<0)
 80           {
    
    
 81             std::cerr<<"accept error"<<std::endl;
 82             continue;
 83           }
 84           uint16_t peer_port=htons(peer.sin_port);
 85           std::string peer_ip=inet_ntoa(peer.sin_addr);
 86           std::cout<<peer_ip<<": "<<peer_port<<std::endl;
 87           //将文件描述符添加到fd_array数组中
 88           if(!AddFdToArray(fd_array,num,sock))
 89           {
    
    
 90             close(sock);
 91             std::cout<<"select server full,close fd "<<sock<<std::endl;                                                    
 92           }
 93         }
 94         else 
 95         {
    
    
 96           if(FD_ISSET(fd_array[i],&rfds))
 97           {
    
    
 98             //读事件就绪
 99             char buffer[1024];
100             //粘包等问题
101             //定制协议
102             //对每个文件描述符定义缓冲区
103             ssize_t s=recv(fd_array[i],buffer,sizeof(buffer)-1,0);
104             if(s>0)
105             {
    
    
106               buffer[s]=0;
107               std::cout<<"echo# "<<buffer<<std::endl;
108             }
109             else if(s==0)
110             {
    
    
111               std::cout<<"client quit"<<std::endl;
112               close(fd_array[i]);
113               fd_array[i]=DFL_FD;
114             }
115             else                                                                                                           
116             {
    
    
117               std::cerr<<"recv error"<<std::endl;
118               close(fd_array[i]);
119               fd_array[i]=DFL_FD;
120             }
121           }
122           else 
123           {
    
    
124             
125           }
126         }
127       }
128     }
129 private:
130     void ClearArray(int fd_array[],int num,int default_fd)
131     {
    
    
132       for(auto i=0;i<num;i++)
133       {
    
    
134         fd_array[i]=default_fd;
135       }
136     }
137     bool AddFdToArray(int fd_array[],int num,int sock)
138     {
    
    
139       for(int i=0;i<num;i++)                                                                                               
140       {
    
    
141         if(fd_array[i]==DFL_FD)
142         {
    
    
143           fd_array[i]=sock;
144           return true;
145         }
146       }
147       //使用完空间了
148       return false;
149     }
150   };
151 
152 };         

sock.h

  1 #include<iostream>                                                                                                         
  2 #include<unistd.h>
  3 #include<cstring>
  4 #include<sys/socket.h>
  5 #include<sys/types.h>
  6 #include<arpa/inet.h>
  7 #include<netinet/in.h>
  8 namespace tzc
  9 {
    
    
 10   class Sock
 11   {
    
    
 12     public:
 13       static int Socket()
 14       {
    
    
 15         int sock=socket(AF_INET,SOCK_STREAM,0);
 16         if(sock<0)
 17         {
    
    
 18           std::cerr<<"socket error"<<std::endl;
 19           exit(1);
 20         }
 21         int opt=1;
 22         setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
 23 
 24           return sock;
 25       }
 26       static bool Bind(int sock,unsigned short port)
 27       {
    
    
 28         struct sockaddr_in local;
 29         memset(&local,0,sizeof(local));
 30         local.sin_family=AF_INET;
 31         local.sin_port=htons(port);
 32         local.sin_addr.s_addr=INADDR_ANY;
 33         if(bind(sock,(struct sockaddr*)& local,sizeof(local))<0)
 34         {
    
    
 35           std::cerr<<"bind error"<<std::endl;
 36           exit(2);
 37         }
 38         return true;
 39       }
 40       static bool Listen(int sock,int backlog)
 41       {
    
    
 42         if(listen(sock,backlog)<0)
 43         {
    
                                                                                                                      
 44           std::cout<<"listen error"<<std::endl;
 45           exit(3);
 46         }
 47         return true;
 48       }
 49   };
 50 
 51 };   

select缺点:
1.select能够同时等待的文件描述符是有上限的(1024)
2.select需要和OS交互数据,涉及较多数据拷贝,当select面临的链接很多时,会因为拷贝数据而降低效率
3.select每次调用,都必须从第三方数组重新添加fd,影响程序运行效率,比较麻烦,
4.select的nfds参数,决定了操作系统检测遍历的范围,当大量连接到来时,OS需要遍历的数据会越来越多
5.select成功返回后,每次都需要遍历第三方数组判断哪些文件描述符事件就绪
select优点:
1.select可以同时等待多个fd,而且只负责等待,不会拷贝数据到用户缓冲区

多路转接适用场景:大量的连接到来,但是只有少量是活跃的
而一般场景:如果大量连接到来都很活跃,直接阻塞式recvfrom读取就足够了

②poll

poll解决了select的两个问题
1.解决了select检测文件上限的问题
2.将用户传给OS的需要检测的文件描述符与OS传给用户的就绪文件描述符的两个行为进行分离
poll的缺点:
1.和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
2.每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
3.同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降

poll函数接口

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数:
fds:需要检测的文件描述符
nfds:结构体数组长度
timeout:轮询方式,与select一致
返回值
小于0, 表示出错;
等于0, 表示poll函数等待超时;
大于0, 表示poll由于监听的文件描述符就绪而返回.
struct pollfd {
int fd; //file descriptor
short events; // 用户 requested events
short revents; // OS returned events
};
events和revents的取值:
POLLIN:数据可读
POLLOUT:数据可写

简易poll

  1 #include"sock.hpp"                                                                                                         
  2 #include<poll.h>
  3 class PollServer
  4 {
    
    
  5   private:
  6     int listen_sock;
  7     int port;
  8   public:
  9     PollServer(int _port):port(_port)
 10     {
    
    }
 11     void InitServer()
 12     {
    
    
 13       listen_sock=tzc::Sock::Socket();
 14       tzc::Sock::Bind(listen_sock,port);
 15       tzc::Sock::Listen(listen_sock,5);
 16     }
 17     void Run()
 18     {
    
    
 19       struct pollfd rfds[64];
 20       for(int i=0;i<64;i++)
 21       {
    
    
 22         rfds[i].fd=-1;
 23         rfds[i].events=0;
 24         rfds[i].revents=0;
 25       }
 26       rfds[0].fd=listen_sock;
 27       rfds[0].events|=POLLIN;
 28       rfds[0].revents=0;
 29       for(;;)
 30       {
    
    
 31         switch(poll(rfds,64,-1))
 32         {
    
    
 33           case 0:
 34             std::cout<<"timeout"<<std::endl;
 35             break;
 36           case -1:
 37             std::cerr<<"poll error"<<std::endl;
 38           default:
 39             for(int i=0;i<64;i++)
 40             {
    
    
 41               if(rfds[i].fd==-1)
 42               {
    
    
 43                 continue;                                                                                                  
 44               }
 45               if(rfds[i].revents&POLLIN)
 46               {
    
    
 47                 if(rfds[i].fd==listen_sock)
 48                 {
    
    
 49                   //连接到来
 50                   std::cout<<"get a new link"<<std::endl;
 51                 }
 52                 else 
 53                 {
    
    
 54                   //读事件就绪
 55                   
 56                 }
 57               }
 58             }
 59         }
 60       }
 61     };
 62 };                           

③epoll

1.epoll模型

在这里插入图片描述
epoll的使用过程就是三部曲:
1.调用epoll_create创建一个epoll句柄
2.调用epoll_ctl, 将要监控的文件描述符进行注册
3.调用epoll_wait, 等待文件描述符就绪

2.epoll的相关系统调用

int epoll_create(int size)
size:128or256,该参数已被废弃
返回值:
返回一个epoll的文件描述符

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
第一个参数是epoll_create()的返回值(epoll的句柄)
第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD :注册新的fd到epfd中
EPOLL_CTL_MOD :修改已经注册的fd的监听事件
EPOLL_CTL_DEL :从epfd中删除一个fd
第三个参数是需要监听的fd
第四个参数是告诉内核需要监听什么事

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
epoll将会把发生的事件拷贝到events数组中
maxevents告知内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size

struct epoll_event结构如下:
在这里插入图片描述

  1 #pragma once
  2 #include"sock.hpp"
  3 #include<sys/epoll.h>                                                                                                      
  4 #define back_log 5   
  5 #define MAX_NUM 64
  6 namespace  ns_epoll
  7 {
    
                      
  8   const int size=256;
  9   class EpollServer  
 10   {
    
                    
 11     private:
 12       int listen_sock;
 13       int epfd;       
 14       uint16_t port;
 15     public:         
 16       EpollServer(int _port):port(_port)
 17       {
    
    }                                
 18       void InitEpollServer()
 19       {
    
                         
 20         listen_sock=tzc::Sock::Socket();
 21         tzc::Sock::Bind(listen_sock,port);
 22         tzc::Sock::Listen(listen_sock,back_log);
 23         if((epfd=epoll_create(size))<0)         
 24         {
    
          
  25           std::cerr<<"epoll_create error\n"<<std::endl;
 26           exit(4);
 27         }
 28       }
 29       void AddEvent(int sock,uint32_t event)
 30       {
    
    
 31         struct epoll_event ev;
 32         ev.events=0;
 33         ev.events|=event;
 34         ev.data.fd=sock;
 35         if(epoll_ctl(epfd,EPOLL_CTL_ADD,sock,&ev)<0)
 36         {
    
    
 37           std::cerr<<"epoll_ctl error,fd:"<<sock<<std::endl;
 38         }
 39       }
 40       void DelEvent(int sock)
 41       {
    
    
 42         if(epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr)<0)
 43         {
    
                                                                                                                      
 44           std::cerr<<"epoll_ctl error"<<std::endl;
 45         }
 46       }
 47       void Run()
 48       {
    
    
  49         AddEvent(listen_sock,EPOLLIN);
 50         int timeout=1000;
 51         struct epoll_event revs[MAX_NUM];
 52         for(;;)
 53         {
    
    
 54           //返回值num表明就绪事件个数,OS会依次放入revs中
 55           int num=epoll_wait(epfd,revs,MAX_NUM,timeout);
 56           if(num>0)
 57           {
    
    
 58             for(int i=0;i<num;i++)
 59             {
    
    
 60               int sock=revs[i].data.fd;
 61               if(revs[i].events&EPOLLIN)
 62               {
    
    
 63                 if(sock==listen_sock)
 64                 {
    
    
 65                   struct sockaddr_in peer;
 66                   socklen_t len=sizeof(peer);
 67                   int sk=accept(listen_sock,(struct sockaddr*)&peer,&len);                                                 
 68                   if(sk<0)
 69                   {
    
    
 70                     std::cout<<"accept error"<<std::endl;
 71                     continue;
 72                   }
 73                   std::cout<<"get a new link: "<<inet_ntoa(peer.sin_addr)<<": "<<ntohs(peer.sin_port)<<std::endl;
 74                   AddEvent(sk,EPOLLIN);
 75                 }
 76                 else 
 77                 {
    
    
 78                   //读事件就绪
 79                   char buffer[1024];
 80                   ssize_t s=recv(sock,buffer,sizeof(buffer)-1,0);
 81                   if(s>0)
 82                   {
    
    
 83                     buffer[s]=0;
 84                     std::cout<<buffer<<std::endl;
 85                   }
 86                   else 
 87                   {
    
    
 88                     std::cout<<"client close"<<std::endl;
 89                     close(sock);
 90                     DelEvent(sock);
 91                   }                                                                                                        
 92                 }
 93               }
 94               else if(revs[i].events&EPOLLOUT)
 95               {
    
    
 96                   //写事件
 97               }
 98             }
 99           }
100           else if(num==0)
101           {
    
    
102             std::cout<<"time out"<<std::endl;
103           }
104           else 
105           {
    
    
106             std::cout<<"epoll error"<<std::endl;
107           }
108         }
109       }
110       ~EpollServer()
111       {
    
    
112         if(listen_sock>=0)
113         {
    
    
114           close(listen_sock);
115         }                                                                                                                  
116         if(epfd>=0)
117         {
    
    
118           close(epfd);
119         }
120       }
121     
122   };
123 };    
                   

epoll优点:
1.接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
2.数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝 内核中,该红黑树结点仍在内存,下次操作不需要重新添加(而select/poll都是每次循环都要进行拷贝)
3.事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
4.没有数量限制: 文件描述符数目无上限

3.epoll的工作方式

epoll有2种工作方式-水平触发(LT)和边缘触发(ET),默认是LT的

LT VS ET

LT与ET的差别在于就绪事件通知机制:
LT:只要底层有数据就会一直通知上层读取数据
ET:当底层的数据从无到有,从有到多变化时才会通知上层一次

对于ET:
只有在底层数据变化时才会通知,但是如果此次并没有读完缓冲区数据,如果此后该缓冲区也再也无数据变化,将会导致该剩余数据一直没能被应用层读取,所以ET应该保证一次就把缓冲区的数据全部读取完

通过不断循环调用recv,判断recv的返回值来判定是否读取完毕,读取时会出现如下两种情况:
情况1:如果返回的小于期望读取的字节数,那么说明已经读取完毕
情况2:如果最后一次读取的刚刚将缓冲区读完,返回值刚好等于缓存区大小,但此时读取将被判定为还未读取完毕,会再次读取,进程将被阻塞,服务器将被挂起

所以对于情况2,就要求ET模式必须为非阻塞轮询模式的读取,当缓冲区无数据时返回值小于0,退出循环

可见ET模式下recv,write都必须是非阻塞的,而LT可以不需要,因为即使此次没读完还会通知,下次还能读取
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET

4.epoll工作方式的对比

1.LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是一次处理完数据
2.相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的
3.同时ET 的代码复杂程度也更高

猜你喜欢

转载自blog.csdn.net/hbbfvv1h/article/details/124066295