Socket学习四

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/buhuiguowang/article/details/81590624

Socket 的五种I/O模型

  • 阻塞I/O、非阻塞I/O、I/O复用、信号驱动I/O、异步I/O
  • 现在用的最多的是I/O复用和异步I/O
  • 阻塞I/O
  • 在这之前我们所用的套接口I/O模型都是阻塞I/O的方式来进行通信的。一旦套接口接收完成后,我们就可以接收数据。此时像系统提交一个recv请求接收数据,即阻塞在这里,直到对等方发送数据填充了recv的接收缓冲区,阻塞解除。然后这些数据就会从缓冲区被复制到用户空间中即 buf中,此时recv返回,这是侯我们就可以对返回得到的数据进行处理了。
  • 非阻塞I/O
  • 将套接口设置为非阻塞可以用 fcntl(fd, F_SETFD, flag|O_NONBLOCK),此时即使recv没有收到数据也不会阻塞,它会返回一个错误,返回值为 -1,但是需要不断地判断是否有数据到来,这种等待我们称之为忙等待。后面的处理和阻塞的处理是一样的。这种最不推荐使用,极大的浪费了CPU资源
  • 这两种模式走了两种极端,前者死等,后者直接不等,但是在不断地轮询等待着数据。
  • 那么有没有这么一种机制,集中管理文件描述符。一旦文件描述符的状态发生变化,就是某一个socket的数据到来的时候,这个机制会告诉我们它的状态发生变化了,然后才取读数据。这样就不用轮询或者阻塞了。
  • 答案是必定的。I/O复用就是这样的机制。
  • 这种模型主要是通过 select 函数来实现的。
  • 思想:用 select 函数管理多个描述符。每当其中的一个文件描述符的状态发生变化时,即数据到来时,select就返回,这时候再调用rev函数时就不会阻塞了,就可以把数据从内核空间复制到用户空间。其实时将阻塞提前到了select函数这里。
  • 信号驱动I/O
  • 异步I/O
  • 异步 I/O, 调用 aio_read 函数 ,并提交一个缓冲区buf,即使内核中没有数据到来,这个函数也会立刻返回,一旦返回,应用进程就可以处理其他的事情。当有数据到来时,内核会自动地将数据复制到用户空间,复制完成后,会通过信号来通知应用进程的程序来处理数据。但是这也需要一定的机制来来通知上层应用,比如 aio_read 中指定的信号 SIGIO 也可能是其他的机制,这取决与 aio_read 的内部实现。因为不同的 aio_read 实现的方式可能不太一样,而且异步 I/O在大部分系统中或多或少都有一定的问题,所以异步 I/O也没有得到很好的推广。
  • 与信号驱动的区别时:前者时拉信号,后者是内核主动将信号复制给用户
  • select模型:
  • 函数原型:int select(int n, fd_set readfds, fd_set writefds, fd_set exceptfds, struct timeval timeout);
  • 返回值:失败返回-1,成功返回文件描述符的个数,超时没检测到文件描述符返回0
  • 参数解析:
  • fd:读、写、异常集合描述符的最大值+1

    • readfds 读集合:监视readfds来查看是否read的时候会被堵塞,注意,即便到了end-of-file,fd也是可读的。
    • writefds 写集合:监视writefds看写的时候会不会被堵塞。
    • 监视exceptfd是否出现了异常。主要用来读取OOB数据,异常并不是指出错。
    • 注意当一个套接口出错时,它会变得既可读又可写。
      如果有了状态改变,会将其他fd清零,只有那些发生改变了的fd保持置位,以用来指示set中的哪一个改变了状态。参数n是所有set里所有fd里,具有最大值的那个fd的值加1
    • fd_set 异常集合
      四个宏用来对fd_set进行操作:
      FD_CLR(int fd, fd_set set); //将文件描述从集合中移除
      FD_ISSET(int fd, fd_set 
      set); //判断文件描述符是否在集合中
      FD_SET(int fd, fd_set set); //将文件描述符添加到集合中
      FD_ZERO(fd_set 
      set); //清空集合
    • time_out 超时结构体

      • timeout是从调用开始到select返回前,会经历的最大等待时间。
      • 两种特殊情况:如果为值为0,会立刻返回。
      • 如果timeout是NULL,会阻塞式等待。
      • struct timeval {
        long tv_sec; / seconds /
        long tv_usec; / microseconds /
        };

      • 一些调用使用3个空的set, n为zero, 一个非空的timeout来达到较为精确的sleep.

      • Linux中, select函数改变了timeout值,用来指示还剩下的时间,但很多实现并不改timeout。
      • 为了较好的可移植性,timeout在循环中需要被重新赋初值。

      • timeout== NULL

      • 无限等待
      • 被信号打断时返回1, errno 设置成 EINTR
      • timeout->tv_sec == 0 && tvptr->tv_usec == 0
      • 不等待立即返回
      • timeout->tv_sec != 0 || tvptr->tv_usec != 0
      • 等待特定时间长度, 超时返回0
    • 通过前面的代码我们可以知道,客户端断开会阻塞在TIME_WAIT_2,无法继续向下推进到 TIME_WAIT 状态???在下面我们用select模型来解决这个问题。
    • 思想:
      • select 作为管理者
      • 用 select 管理多个I/O;一旦其中的一个或者多个I/O检测到我们所感兴趣的事件,select 函数返回,返回值为检测到的事件个数。并且返回了那些I/O。
      • 遍历这些事件,进而处理这些事件
客户端
服务器代码见前面socket学习三

#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<signal.h>
#include <sys/select.h>
#include <sys/time.h>

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>


ssize_t readn(int fd,void *buf,size_t count)
{
//ssize_t=int,size_t=unsigned int
//接收count个字节数
    size_t nleft=count;//剩余字节数
    ssize_t nread;//已经接收字节数
    char *bufp=(char *)buf;
    while(nleft>0)
    {
        if((nread=read(fd,bufp,nleft))<0)
        {
            if(errno==EINTR)//被中断
                continue;
            return -1;
        }
        else if(nread==0)//对等方关闭
            return count-nleft;//读到EOF,对方关闭
        bufp+=nread;
        nleft-=nread;
    }
    return count;
}

ssize_t writen(int fd,void *buf,size_t count)
{
    size_t nleft=count;//剩余发送字节数
    ssize_t nwritten;//已经发送字节数
    char *bufp=(char *)buf;
    while(nleft>0)//一般而言,write缓冲区大于发送数据缓冲区,不阻塞
    {
        if((nwritten=write(fd,bufp,nleft))<0)
        {
            if(errno==EINTR)//被中断
                continue;
            return -1;
        }
        else if(nwritten==0)//对等方关闭
            continue;//读到EOF,对方关闭
        bufp+=nwritten;
        nleft-=nwritten;
    }
    return count;
}
ssize_t recv_peek(int sockfd,void *buf,size_t len)
{
    while(1)
    {
        int ret=recv(sockfd,buf,len,MSG_PEEK);
        if(ret==-1&&errno==EINTR)//操作被信号中断,recv认为链接正常,继续偷窥
            continue;
        return ret;//返回读取的字节数
    }

}
ssize_t readline(int sockfd,void *buf,size_t maxline)
{
    int ret;
    int nread;
    char *bufp=buf;
    int nleft=maxline;//读取遇到\n返回,不会超过maxline
    while(1)
    {
        ret=recv_peek(sockfd,bufp,nleft);
        if(ret<=0)
            return ret;
        nread=ret;
        //接下来判断接收的缓冲区是否有\n
        int i;
        for(i=0;i<nread;i++)
        {
            if(bufp[i]=='\n')
            {
                ret=readn(sockfd,bufp,i+1);
                if(ret!=i+1)
                    exit(EXIT_FAILURE);//偷窥方法
                return ret;
            }
        }

        if(nread>nleft)   //偷窥到的数据不能大于maxline
        {
            exit(EXIT_FAILURE);
        }
        nleft-=nread;//剩余字节数
        ret=readn(sockfd,bufp,nread);//将nread数据从缓冲区移除
        if(ret!=nread)
            exit(EXIT_FAILURE);
        bufp+=nread;//继续偷窥
    }
    return 1;                       
}

void echo_client(int sock)
{
/*  char        sendbuf[1024] = {0};
    char        recvbuf[1024] = {0};
    while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
    {
        //writen(sock,&sendbuf,strlen(sendbuf));//将接收到的数据发送出去
        //为检验 SIGPIPE 信号
        writen(sock, sendbuf, 1);
        writen(sock,sendbuf+1,strlen(sendbuf)-1);

        int ret=readline(sock,recvbuf,sizeof(recvbuf));//函数从打开的文件,设备中读取数据
        if(ret==-1)
        {
            printf("readline err");
        }
        else if(ret==0)
        { 
            printf("client_close\n");
            break;
        }

        fputs(recvbuf,stdout);//发送数据到文件
        memset(sendbuf,0,sizeof(sendbuf));
        memset(recvbuf,0,sizeof(recvbuf));
    }
    close(sock);
*/
    fd_set rset;  //建立一个集合
    FD_ZERO(&rset);   //初始化集合

    int nready;   //表示见到的文件描述符个数
    int maxfd;
    int fd_stdin = fileno(stdin);//为什么这里不用STD_FILENO,因为这个宏的值为0;但是我们不能保证这个标准输入是否被重定向,导致这个文件描述符可能部位0
    if(fd_stdin > sock)
        maxfd = fd_stdin;
    else
        maxfd = sock;

    char    sendbuf[1024] = {0};
    char    recvbuf[1024] = {0};
    while(1)
    {
        FD_SET(fd_stdin, &rset);
        FD_SET(sock, &rset);
        nready = select(maxfd+1, &rset, NULL, NULL, NULL);
        if(nready == -1)
            printf("select err\n");

        if(nready == 0)
            continue;
        if(FD_ISSET(sock, &rset)) //判断检测到sock(可读)事件是否在集合中
        {   
            int ret=readline(sock,recvbuf,sizeof(recvbuf));//函数从打开的文件,设备中读取数据
            if(ret==-1)
            {
                printf("readline err");
            }
            else if(ret==0)
            { 
                printf("server is closed\n");
                break;
            }

            fputs(recvbuf,stdout);//发送数据到文件
            memset(recvbuf,0,sizeof(recvbuf));
        }
        if(FD_ISSET(fd_stdin, &rset))  //判断检测到fd_ISSET(写入)事件是否在集合中
        {
           if(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
           {
               writen(sock,&sendbuf,strlen(sendbuf));
               memset(sendbuf, 0, sizeof(sendbuf));
           }
           else
               break;

        }
    }
    close(sock);
}

void handle_sigpipe(int sig)
{
    printf("recv a sig=%d\n", sig);
}

int main()
{
    //signal(SIGPIPE, handle_sigpipe);
    signal(SIGPIPE, SIG_IGN);
    int sock;
    /*if((listenfd=socket(AF_INET,SOCK_STEAM,IPPOTO_TCP))<0) */
    if((sock=socket(AF_INET,SOCK_STREAM,0))<0)
        printf("socket err\n");

    //IPV4地址结构
    struct sockaddr_in servaddr;
    memset(&servaddr,0,sizeof(servaddr));
    servaddr.sin_family=AF_INET;//地址家族
    servaddr.sin_port=htons(8001);//端口,主机转网络
    servaddr.sin_addr.s_addr=inet_addr("127.0.0.1");
    /*inet_aton("127.0.0.1",&servaddr.sin_addr);*/


    if(connect(sock,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
        printf("connect err\n");
    //获取本地端口号、IP
    struct sockaddr_in localaddr;
    socklen_t addrlen = sizeof(localaddr);
    if(getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
        printf("getsockname err\n");
    printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));

    echo_client(sock);

    return 0;

}
  • 运行服务器与客户端,然后将断掉服务器与客户端的连接,结果如下:
  • 可以看到,客户端直接进入到了TIME_WAIT 状态,并没有阻塞在TIME_WAIT_2状态。
  • 读、写、异常事件发生的条件

    • 可读:
      • 套接口缓冲区有数据可读(对等方发送数据过来,填充了本地套接口缓冲区,导致了套接口缓冲区有数据可读)
      • 连接的读一半关闭,即接收到FIN段,读操作将返回0 (这个时候也能通知select表示某个套接口产生了可读事件,这时候运行读操作,返回值为 0,表示对等方关闭)
      • 如果是监听套接口,已完成连接队列不为空时。(就是说对等方在connect连接完成时,则已完成连接对列就不为空了,监听套接口就会产生可读事件通知select检测到)
      • 套接口上发生了一个错误待处理(同样会产生可读事件,通知select),错误可以通过getsockopt 指定SO_ERROR选项来获取。
  • 可写:

    • 套接口发送缓冲区有空间容纳数据(在缓冲区写入产生可写事件,在缓冲区没有满的时候,会频繁的产生可写事件)
    • 连接的写一半关闭。及收到的RST段之后,再次调用write操作。
    • 套接口上发生了一个错误待处理,错误可以通过getsockopt 指定SO_ERROR选项来获取。
  • 异常

    • 套接口存在带外数据。
  • 用select改进服务器

/*********************************************************************************
 *      Copyright:  (C) 2018 anzhihong<[email protected]>
 *                  All rights reserved.
 *
 *       Filename:  socket_server.c
 *    Description:  This file 
 *                 
 *        Version:  1.0.0(2018年08月05日)
 *         Author:  anzhihong <[email protected]>
 *      ChangeLog:  1, Release initial version on "2018年08月05日 08时23分10秒"
 *                 
 ********************************************************************************/

#include <unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include <signal.h>
#include <sys/select.h>

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
    do \
    { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)


ssize_t readn(int fd,void *buf,size_t count)
{
    size_t nleft=count;//还需要读的字节数
    ssize_t nread;//已经接收字节数
    char *bufp=(char *)buf;

     while(nleft>0)
     {
        if((nread=read(fd,bufp,nleft))<0)
        {
            if(errno==EINTR)//read是可中断的,所以当被外来信号中断打断时,不算错误,任然继续执行
                continue;
            return -1;
        }
        else if(nread==0)//对等方关闭
            return count-nleft;//读到EOF,对方关闭
        bufp  += nread;
        nleft -= nread;
     }
    return count;
}

ssize_t writen(int fd,void *buf,size_t count)
{
    size_t nleft=count;//剩余发送字节数
    ssize_t nwritten;//已经发送字节数
    char *bufp=(char *)buf;

     while(nleft>0)//一般而言,write缓冲区大于发送数据缓冲区,不阻塞
     {
        if((nwritten=write(fd,bufp,nleft))<0)
        {
            if(errno==EINTR)//被中断
                continue;
            return -1;
        }
        else if(nwritten==0)//对等方关闭
            continue;//读到EOF,对方关闭
        bufp+=nwritten;
        nleft-=nwritten;
     }
     return count;
}

ssize_t recv_peek(int sockfd,void *buf,size_t len)
{
    while(1)
    {
        int ret=recv(sockfd,buf,len,MSG_PEEK); //提前偷窥缓冲区的数据,并不读取。
        if(ret==-1&&errno==EINTR)//操作被信号中断,recv认为链接正常,继续执行。
            continue;
        return ret;
    }
}

ssize_t readline(int sockfd,void *buf,size_t maxline)
{
    int ret = 0;
    int nread;
    char *bufp = buf;
    int nleft = maxline; //读取遇到\n返回,不会超过maxline

    while(1)
    {
        ret = recv_peek(sockfd, bufp, nleft);
        if(ret <= 0)
            return ret;
        nread = ret;

        int i;
        for(i=0; i<nread; i++)
        {
            if(bufp[i] == '\n')
            {
                ret = readn(sockfd, bufp, i+1);
                if(ret != i+1)
                    exit(EXIT_FAILURE); //偷窥失败
                return ret;
            }
        }
        if(nread > nleft)    //偷窥到的数据不能大于maxline
            exit(EXIT_FAILURE);
        nleft -= nread; //剩余字节数
        ret = readn(sockfd, bufp, nread); //将已经读到数据nread从缓冲区移除
        if(ret != nread)
            exit(EXIT_FAILURE);
        bufp += nread;   //继续偷窥
    }
    return 1;
}

void echo_service(int conn)
{
    char recvbuf[1024];

    while(1)
    {
        memset(recvbuf, 0, sizeof(recvbuf)); //初始化recvbuf
        int ret=readline(conn,recvbuf,1024);//一行一行接收数据。
        if(ret == -1)
            ERR_EXIT("readline");
        if(ret == 0)
        {
            printf("client close");
            break;
        }

        fputs(recvbuf, stdout);
        writen(conn, recvbuf, strlen(recvbuf));
    }
}

void handle_sigchld(int sig)
{
     while(waitpid(-1, NULL, WNOHANG) > 0)
         ;
}

int main(void)
{
    //处理僵尸进程
    signal(SIGCHLD, handle_sigchld);
    int listenfd;
    /* if((listenfd=socket(AF_INET,SOCK_STEAM,IPPOTO_TCP))<0) */
    if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0)   //第一次打开 listenfd
        ERR_EXIT("socket err");

     //IPV4地址结构
     struct sockaddr_in servaddr;
     memset(&servaddr,0,sizeof(servaddr));
     servaddr.sin_family=AF_INET;//地址家族
     servaddr.sin_port=htons(8001);//端口,主机转网络
     /*servaddr.sin_addr.s_addr=inet_addr("127.0.0.1");
     inet_aton("127.0.0.1",&servaddr.sin_addr);*/
     servaddr.sin_addr.s_addr=htonl(INADDR_ANY);

    //地址复用
    int on=1;
    if(setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on))<0)
        ERR_EXIT("setsockopt err");

    //绑定端口号,ip地址
    if(bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0) //第二次打开 listenfd
        ERR_EXIT("bind err");

    //监听
    if(listen(listenfd,SOMAXCONN)<0)   //第三次打开 listenfd
        ERR_EXIT("bind err");

    struct sockaddr_in peeraddr;
   // socklen_t peerlen =sizeof(peeraddr);//typedef int socklen_t
    socklen_t peerlen;
    int conn;//已连接套接字
/*     pid_t pid;
     while(1)
     {
        if((conn=accept(listenfd,(struct sockaddr*)&peeraddr,&peerlen))<0)
            ERR_EXIT("accept err");

        printf("ip=%s port=%d\n",inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port));

        pid = fork();
        if(pid==-1)
            ERR_EXIT("fork err");
        if(pid==0)
        {
            close(listenfd);
            do_service(conn);
            exit(EXIT_SUCCESS);//将子进程退出,要不它还会fork()
        }

        else
            close(conn);
     }
*/
    int nready; //select返回的监测个数
    int maxfd = listenfd;   //所以这里的 listenfd 的值为3
    fd_set allset; //建立集合
    fd_set rset;
    FD_ZERO(&allset);  //清除集合
    FD_ZERO(&rset);
    FD_SET(listenfd, &allset); //将接听套接口放入 allset 中
    /*  当有新的客户端连接过来时,conn 会被新的客户端覆盖,所以定义一个数组来存放客户端的信息,为什么使用 fork()时 conn 没有被覆盖呢?这是因为每个 fork() 中的 conn 时独立的,而我们现在采用的是单进程来实现,一个单进程只有一个 conn,所以需要一个套接口数组集合来存储客户端 conn 的连接信息 */
    int client[FD_SETSIZE]; //select最多可以处理这么多的描述符
    int i;
    for(i=0;i<FD_SETSIZE;i++)
    {
        client[i] = -1;   //表示空闲
    }


    while(1)
    {
        /*在程序运行以后,allset 当中会有两个套接口,一个是 listenfd, 一个是 conn。此时就会有多种事件发生。一种时listenfd套接口产生的事件,一种是conn套接口产生的事件,另一种是两种套接口产生的事件。这里需要allset的原因是这里rset的套接口数组集的的内容会变化,它只会保存套接口数族当前的内容,在下一次select时,只会监听当前的套接口,而不会监听所有的套接口,所以需要allset来临时保存rset以前的内容*/
        rset = allset;
        nready = select(maxfd+1, &rset, NULL, NULL, NULL);
        if(nready == -1)  //如果 nready 等于-1,监听失败
        {
            if(errno == EINTR)  //还有一种情况就是被中断信号打断,可能导致 select不能重启
            continue;  //继续监听
            ERR_EXIT("select");
        }
        if(nready == 0)    //如果超时,继续执行
            continue;
        /* 监听套接口发生可读事件意味着对方连接三次握手已经成功,我们这边已完成连接队列中的条目不为空了,那么此时再调用 accept 函数将不会再阻塞*/
        if(FD_ISSET(listenfd, &rset)) //判断是否是监听套接口产生了可读事件
        {
            peerlen = sizeof(peeraddr);   
            conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);

            if(conn == -1)
                ERR_EXIT("accept");
            for(i=0; i<FD_SETSIZE;i++) //将客户端的连接信息保存在一个空闲的位置
            {
                if(client[i] < 0) //说明找到了空闲位置,如果client小于0
                {
                    client[i] = conn;
                    break;
                }
        }
            if(i == FD_SETSIZE)   //如果i等于select设定的长度,表明client的连接数量超出了我们的连接范围
             {
                  fprintf(stderr, "too many clients\n");
                  exit(EXIT_FAILURE);
             }
            printf("ip=%s port=%d\n",inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port));
            /* 现在我们已经得到了一个已经连接的套接口conn,下一次我们仍然需要关心 conn 的可读事件的发生,也就是说下一次调用select时我们也要把它放入 select 中处理 */
            FD_SET(conn, &allset);
            /* 因为不断地往数组集合中添加套接口,所以maxfd可能不是恒大于以前的maxfd,所以要更新maxfd的值 */
            if(conn > maxfd)
                maxfd = conn;
            /* 因为程序的不断运行,导致添加到数组中的套接口越来越多,从而导致select的返回值可能大于1,所以我们需要一个一个的处理 */
            //首先时conn产生的可读事件
            if(--nready <= 0)    //证明这里检测到的事件已经处理完毕,然后继续监听
                continue;
        }
        //处理已连接套接口conn的事件
        for(i=0; i<FD_SETSIZE; i++)
        {
            conn = client[i];
            if(conn == -1)  //表示空闲的位置,继续执行
                continue;
            if(FD_ISSET(conn, &rset))  //检测读集合中是否有conn
            { 
                char recvbuf[1024] = {0};
                memset(recvbuf, 0, sizeof(recvbuf)); //初始化recvbuf
                int ret=readline(conn,recvbuf,1024);//一行一行接收数据。
                if(ret == -1)
                ERR_EXIT("readline");
                if(ret == 0)
                    {
                        printf("client close\n");
                        FD_CLR(conn, &allset); //当客户端关闭,那么就不需要再关心套接口conn产生的事件了
                        printf("conn already delete\n");
                    }

                fputs(recvbuf, stdout);
                writen(conn, recvbuf, strlen(recvbuf));

                if(--nready <= 0) //这里意味着所有的事件都已经处理完了,就跳出循环。
                    break;
            }
        }

    }
     return 0;

}

  • 如图所示集合总的容量是 FD_SETSIZE ,下标为 0 的表示没有将其放入集合当中,也就说文件描述符0,1,2都没有放入。因为我们的监听套接口的文件描述符为3,所以将其放入集合中。接下来如果套接口产生了可读事件,accept返回一个新的套接口,此时套接口的文字描述符为4,然后将新的套接口加入空闲的数组中。如第一个,client[0],文字描述符为5的套接口放入client[1]中,以此类推。当套接口为4的关闭client[0]的值又要重新置为1,并且将其从集合中的值置为0,即去除套接口为4的那个。但是这个时候不会更新maxfd的值,意味着select函数要从0开始遍历所有的套接口(这里是检测到所有的可读事件才返回,并不是一检测到可读事件就返回),然后返回发生事件的个数。一旦产生了可读事件,要遍历所有已连接的套接口,其范围是0~FD_SETSIZE,那么我们能不能缩小一下范围呢?我们可以记录一个最大的不空闲的位置 maxi。
  • 代码
/*********************************************************************************
 *      Copyright:  (C) 2018 anzhihong<[email protected]>
 *                  All rights reserved.
 *
 *       Filename:  socket_server.c
 *    Description:  This file 
 *                 
 *        Version:  1.0.0(2018年08月05日)
 *         Author:  anzhihong <[email protected]>
 *      ChangeLog:  1, Release initial version on "2018年08月05日 08时23分10秒"
 *                 
 ********************************************************************************/

#include <unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include <signal.h>
#include <sys/select.h>

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
    do \
    { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)


ssize_t readn(int fd,void *buf,size_t count)
{
    size_t nleft=count;//还需要读的字节数
    ssize_t nread;//已经接收字节数
    char *bufp=(char *)buf;

     while(nleft>0)
     {
        if((nread=read(fd,bufp,nleft))<0)
        {
            if(errno==EINTR)//read是可中断的,所以当被外来信号中断打断时,不算错误,任然继续执行
                continue;
            return -1;
        }
        else if(nread==0)//对等方关闭
            return count-nleft;//读到EOF,对方关闭
        bufp  += nread;
        nleft -= nread;
     }
    return count;
}

ssize_t writen(int fd,void *buf,size_t count)
{
    size_t nleft=count;//剩余发送字节数
    ssize_t nwritten;//已经发送字节数
    char *bufp=(char *)buf;

     while(nleft>0)//一般而言,write缓冲区大于发送数据缓冲区,不阻塞
     {
        if((nwritten=write(fd,bufp,nleft))<0)
        {
            if(errno==EINTR)//被中断
                continue;
            return -1;
        }
        else if(nwritten==0)//对等方关闭
            continue;//读到EOF,对方关闭
        bufp+=nwritten;
        nleft-=nwritten;
     }
     return count;
}

ssize_t recv_peek(int sockfd,void *buf,size_t len)
{
    while(1)
    {
        int ret=recv(sockfd,buf,len,MSG_PEEK); //提前偷窥缓冲区的数据,并不读取。
        if(ret==-1&&errno==EINTR)//操作被信号中断,recv认为链接正常,继续执行。
            continue;
        return ret;
    }
}

ssize_t readline(int sockfd,void *buf,size_t maxline)
{
    int ret = 0;
    int nread;
    char *bufp = buf;
    int nleft = maxline; //读取遇到\n返回,不会超过maxline

    while(1)
    {
        ret = recv_peek(sockfd, bufp, nleft);
        if(ret <= 0)
            return ret;
        nread = ret;

        int i;
        for(i=0; i<nread; i++)
        {
            if(bufp[i] == '\n')
            {
                ret = readn(sockfd, bufp, i+1);
                if(ret != i+1)
                    exit(EXIT_FAILURE); //偷窥失败
                return ret;
            }
        }
        if(nread > nleft)    //偷窥到的数据不能大于maxline
            exit(EXIT_FAILURE);
        nleft -= nread; //剩余字节数
        ret = readn(sockfd, bufp, nread); //将已经读到数据nread从缓冲区移除
        if(ret != nread)
            exit(EXIT_FAILURE);
        bufp += nread;   //继续偷窥
    }
    return 1;
}

void echo_service(int conn)
{
    char recvbuf[1024];

    while(1)
    {
        memset(recvbuf, 0, sizeof(recvbuf)); //初始化recvbuf
        int ret=readline(conn,recvbuf,1024);//一行一行接收数据。
        if(ret == -1)
            ERR_EXIT("readline");
        if(ret == 0)
        {
            printf("client close");
            break;
        }

        fputs(recvbuf, stdout);
        writen(conn, recvbuf, strlen(recvbuf));
    }
}

void handle_sigchld(int sig)
{
     while(waitpid(-1, NULL, WNOHANG) > 0)
         ;
}

int main(void)
{
    //处理僵尸进程
    signal(SIGCHLD, handle_sigchld);
    int listenfd;
    /* if((listenfd=socket(AF_INET,SOCK_STEAM,IPPOTO_TCP))<0) */
    if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0)   //第一次打开 listenfd
        ERR_EXIT("socket err");

     //IPV4地址结构
     struct sockaddr_in servaddr;
     memset(&servaddr,0,sizeof(servaddr));
     servaddr.sin_family=AF_INET;//地址家族
     servaddr.sin_port=htons(8001);//端口,主机转网络
     /*servaddr.sin_addr.s_addr=inet_addr("127.0.0.1");
     inet_aton("127.0.0.1",&servaddr.sin_addr);*/
     servaddr.sin_addr.s_addr=htonl(INADDR_ANY);

    //地址复用
    int on=1;
    if(setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on))<0)
        ERR_EXIT("setsockopt err");

    //绑定端口号,ip地址
    if(bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr))<0) //第二次打开 listenfd
        ERR_EXIT("bind err");

    //监听
    if(listen(listenfd,SOMAXCONN)<0)   //第三次打开 listenfd
        ERR_EXIT("bind err");

    struct sockaddr_in peeraddr;
   // socklen_t peerlen =sizeof(peeraddr);//typedef int socklen_t
    socklen_t peerlen;
    int conn;//已连接套接字
/*     pid_t pid;
     while(1)
     {
        if((conn=accept(listenfd,(struct sockaddr*)&peeraddr,&peerlen))<0)
            ERR_EXIT("accept err");

        printf("ip=%s port=%d\n",inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port));

        pid = fork();
        if(pid==-1)
            ERR_EXIT("fork err");
        if(pid==0)
        {
            close(listenfd);
            do_service(conn);
            exit(EXIT_SUCCESS);//将子进程退出,要不它还会fork()
        }

        else
            close(conn);
     }
*/
    int nready; //select返回的监测个数
    int maxfd = listenfd;   //所以这里的 listenfd 的值为3
    fd_set allset; //建立集合
    fd_set rset;
    FD_ZERO(&allset);  //清除集合
    FD_ZERO(&rset);
    FD_SET(listenfd, &allset); //将接听套接口放入 allset 中
    /*  当有新的客户端连接过来时,conn 会被新的客户端覆盖,所以定义一个数组来存放客户端的信息,为什么使用 fork()时 conn 没有被覆盖呢?这是因为每个 fork() 中的 conn 时独立的,而我们现在采用的是单进程来实现,一个单进程只有一个 conn,所以需要一个套接口数组集合来存储客户端 conn 的连接信息 */
    int client[FD_SETSIZE]; //select最多可以处理这么多的描述符
    int maxi = 0;   //添加一个遍历范围,初始值为0
    int i;
    for(i=0;i<FD_SETSIZE;i++)
    {
        client[i] = -1;   //表示空闲
    }

   // int count = 0;  //测试服务器接受客户端连接数量
    while(1)
    {
        /*在程序运行以后,allset 当中会有两个套接口,一个是 listenfd, 一个是 conn。此时就会有多种事件发生。一种时listenfd套接口产生的事件,一种是conn套接口产生的事件,另一种是两种套接口产生的事件。这里需要allset的原因是这里rset的套接口数组集的的内容会变化,它只会保存套接口数族当前的内容,在下一次select时,只会监听当前的套接口,而不会监听所有的套接口,所以需要allset来临时保存rset以前的内容*/
        rset = allset;
        nready = select(maxfd+1, &rset, NULL, NULL, NULL);
        if(nready == -1)  //如果 nready 等于-1,监听失败
        {
            if(errno == EINTR)  //还有一种情况就是被中断信号打断,可能导致 select不能重启
            continue;  //继续监听
            ERR_EXIT("select");
        }
        if(nready == 0)    //如果超时,继续执行
            continue;
        /* 监听套接口发生可读事件意味着对方连接三次握手已经成功,我们这边已完成连接队列中的条目不为空了,那么此时再调用 accept 函数将不会再阻塞*/
        if(FD_ISSET(listenfd, &rset)) //判断是否是监听套接口产生了可读事件
        {
            peerlen = sizeof(peeraddr);   
            conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);

            if(conn == -1)
                ERR_EXIT("accept");
            for(i=0; i<FD_SETSIZE;i++) //将客户端的连接信息保存在一个空闲的位置
            {
                if(client[i] < 0) //说明找到了空闲位置,如果client小于0
                {
                    client[i] = conn;
                    if(i > maxi)
                    {
                        maxi = i; //最大不空闲的位置发生了改变
                        printf("maxi is:%d\n", maxi);
                    }
                    break;
                }
        }

            if(i == FD_SETSIZE)   //如果i等于select设定的长度,表明client的连接数量超出了我们的连接范围
            {
                 fprintf(stderr, "too many clients\n");
                 exit(EXIT_FAILURE);
            }
            printf("ip=%s port=%d\n",inet_ntoa(peeraddr.sin_addr),ntohs(peeraddr.sin_port));
            /* 现在我们已经得到了一个已经连接的套接口conn,下一次我们仍然需要关心 conn 的可读事件的发生,也就是说下一次调用select时我们也要把它放入 select 中处理 */
            FD_SET(conn, &allset);
            /* 因为不断地往数组集合中添加套接口,所以maxfd可能不是恒大于以前的maxfd,所以要更新maxfd的值 */
            if(conn > maxfd)
                maxfd = conn;
            /* 因为程序的不断运行,导致添加到数组中的套接口越来越多,从而导致select的返回值可能大于1,所以我们需要一个一个的处理 */
            //首先时conn产生的可读事件
            if(--nready <= 0)    //证明这里检测到的事件已经处理完毕,然后继续监听
                continue;
        }
        //处理已连接套接口conn的事件
        //for(i=0; i<FD_SETSIZE; i++) 
        for(i=0; i<=maxi; i++)   //遍历的范围从FD_SETSIZE缩小到maxi
        {
            conn = client[i];
            if(conn == -1)  //表示空闲的位置,继续执行
                continue;
            if(FD_ISSET(conn, &rset))  //检测读集合中是否有conn
            { 
                char recvbuf[1024] = {0};
                memset(recvbuf, 0, sizeof(recvbuf)); //初始化recvbuf
                int ret=readline(conn,recvbuf,1024);//一行一行接收数据。
                if(ret == -1)
                ERR_EXIT("readline\n");
                if(ret == 0)
                    {
                        printf("client close\n");
                        FD_CLR(conn, &allset); //当客户端关闭,那么就不需要再关心套接口conn产生的事件了
                        printf("conn already delete\n");
                        client[i] = -1;   //重新将client[i]置为空闲。
            close(conn);
                    }
            //printf("count is %d\n", ++count);
                fputs(recvbuf, stdout);
                writen(conn, recvbuf, strlen(recvbuf));

                if(--nready <= 0) //这里意味着所有的事件都已经处理完了,就跳出循环。
                    break;
            }
        }

    }
     return 0;

}
  • 注意这里的套接口不只受FD_SETSIZE的限制,还受到一个进程中能够打开的I/O的数目的限制。I/O的数目是可以更改的。通过命令ulimit -n来更改进程数(临时的)。FD_SETSIZE需要更改内核才行。
代码更改
/*********************************************************************************
 *      Copyright:  (C) 2018 anzhihong<[email protected]>
 *                  All rights reserved.
 *
 *       Filename:  socket_test1024.c
 *    Description:  This file 
 *                 
 *        Version:  1.0.0(2018年08月05日)
 *         Author:  anzhihong <[email protected]>
 *      ChangeLog:  1, Release initial version on "2018年08月05日 08时23分10秒"
 *                 
 ********************************************************************************/

#include <unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include <signal.h>
#include <sys/select.h>
#include <sys/resource.h>


#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>


#define ERR_EXIT(m) \
    do \
    { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)

int main(void)
{
    struct rlimit rl;
    if(getrlimit(RLIMIT_NOFILE, &rl) < 0)
        ERR_EXIT("getrlimit");

    printf("%d\n",(int)rl.rlim_max);

    rl.rlim_cur = 2048;
    rl.rlim_max = 2048;
    if(setrlimit(RLIMIT_NOFILE, &rl) < 0)
        ERR_EXIT("setrlimit");
    if(getrlimit(RLIMIT_NOFILE, &rl) < 0)
        ERR_EXIT("getrlimit");
     printf("%d\n",(int)rl.rlim_max); 
    return 0;

}
  • 但是这个只能更改当前进程的最大限制,并不能更改父进程的。
  • select主要限制是其套接口数量的限制。
  • 测试代码

#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<signal.h>
#include <sys/select.h>
#include <sys/time.h>

#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
    do \
    { \
        perror(m); \
        exit(EXIT_FAILURE); \
    } while(0)


int main()
{
    int count = 0;
    while(1)
    {
        int sock;
        if((sock=socket(AF_INET,SOCK_STREAM,0))<0)
    {
        sleep(4);
            ERR_EXIT("socket");
    }

        //IPV4地址结构
        struct sockaddr_in servaddr;
        memset(&servaddr,0,sizeof(servaddr));
        servaddr.sin_family=AF_INET;//地址家族
        servaddr.sin_port=htons(8001);//端口,主机转网络
        servaddr.sin_addr.s_addr=inet_addr("127.0.0.1");
        /*inet_aton("127.0.0.1",&servaddr.sin_addr);*/


        if(connect(sock,(struct sockaddr*)&servaddr,sizeof(servaddr))<0)
            ERR_EXIT("connect");
        //获取本地端口号、IP
        struct sockaddr_in localaddr;
        socklen_t addrlen = sizeof(localaddr);
        if(getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
            printf("getsockname err\n");
        printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
        printf("count = %d\n", ++count);
    }

        return 0;

}
  • 测试服务器的代码见上面服务代码的注释。最后服务器的结果是1020。也许有人会注意到上面有一行 sleep(4); 当客户端调用socket准备创建第1022个套接字时,如上所示也会提示错误,此时socket函数返回-1出错,如果没有睡眠4s后再退出进程会有什么问题呢?如果直接退出进程,会将客户端所打开的所有套接字关闭掉,即向服务器端发送了很多FIN段,而此时也许服务器端还一直在accept ,即还在从已连接队列中返回已连接套接字,此时服务器端除了关心监听套接字的可读事件,也开始关心前面已建立连接的套接字的可读事件,read 返回0,所以会有很多 client close 字段 参杂在条目的输出中,还有个问题就是,因为read 返回0,服务器端会将自身的已连接套接字关闭掉,那么也许刚才说的客户端某一个连接会被accept 返回,即测试不出服务器端真正的并发容量。
  • 客户端结果:
  • 服务器结果:
  • 将 sleep(4); 注释掉,可以看到输出参杂着client close,且这次的count 达到了1021,原因就是服务器端前面已经有些套接字关闭了,所以accept 创建套接字不会出错,服务器进程也不会因为出错而退出,可以看到最后接收到的一个连接端口是33742,即不一定是客户端的最后一个连接。观察服务器端的输出如下:

  • 套接字I/超时设置方法
  • 用select实现超时
    • read_timeout 函数封装
    • write_timeout 函数封装
    • accept_timeout 函数封装
    • connect__timeout 函数封装
  • 三种设置方法:

    • 闹钟方式设置alarm(但是这种方法可能会与其他的闹钟产生冲突,不推荐使用)
      • 超时之后产生一个SIG_ALRM信号
    • 套接字选项(也不推荐使用,移植性不太好)

      • SO_SNDTIMEO
      • SO_RCVTIMEO
    • select (主要使用select来设置超时)

      • read_timeout 函数封装
 read_timeout - 读超时检测函数,不含读操作 
* fd:文件描述符 
* wait_seconds:等待超时秒数, 如果为0表示不检测超时; 
* 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT 
*/ 
int read_timeout( int fd, unsigned int wait_seconds) 
{ 
int ret = 0 ; 
if (wait_seconds > 0 ) 
{ 
    fd_set read_fdset; 
     struct  timeval timeout; 

    FD_ZERO(&read_fdset); 
    FD_SET(fd, &read_fdset); 

    timeout.tv_sec = wait_seconds; 
    timeout.tv_usec =  0 ; 

     do 
    { 
        ret = select(fd +  1 , &read_fdset,  NULL ,  NULL , &timeout);  //select会阻塞直到检测到事件或者超时 
         // 如果select检测到可读事件发送,则此时调用read不会阻塞 
    } 
     while  (ret <  0  && errno == EINTR); 

     if  (ret ==  0 ) 
    { 
        ret = - 1 ; 
        errno = ETIMEDOUT; 
    } 
     else   if  (ret ==  1 ) 
         return   0 ; 

} 

 return  ret;
}
  • write_timeout 函数封装
/* write_timeout - 写超时检测函数,不含写操作 
* fd:文件描述符 
* wait_seconds:等待超时秒数, 如果为0表示不检测超时; 
* 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT 
*/ 
int write_timeout( int fd, unsigned int wait_seconds) 
{ 
int ret = 0 ; 
if (wait_seconds > 0 ) 
{ 
    fd_set write_fdset; 
     struct  timeval timeout; 

    FD_ZERO(&write_fdset); 
    FD_SET(fd, &write_fdset); 

    timeout.tv_sec = wait_seconds; 
    timeout.tv_usec =  0 ; 

     do 
    { 
        ret = select(fd +  1 ,  NULL , &write_fdset,  NULL , &timeout); 
    } 
     while  (ret <  0  && errno == EINTR); 

     if  (ret ==  0 ) 
    { 
        ret = - 1 ; 
        errno = ETIMEDOUT; 
    } 
     else   if  (ret ==  1 ) 
         return   0 ; 

} 

 return  ret; 
}
  • accept_timeout 函数封装

/* accept_timeout - 带超时的accept 
* fd: 套接字 
* addr: 输出参数,返回对方地址 
* wait_seconds: 等待超时秒数,如果为0表示正常模式 
* 成功(未超时)返回已连接套接字,失败返回-1,超时返回-1并且errno = ETIMEDOUT 
*/ 
int accept_timeout( int fd, struct sockaddr_in *addr, unsigned int wait_seconds) 
{ 
int ret; 
socklen_t addrlen = sizeof ( struct sockaddr_in); 
 if  (wait_seconds >  0 ) 
{ 

    fd_set accept_fdset; 
     struct  timeval timeout; 
    FD_ZERO(&accept_fdset); 
    FD_SET(fd, &accept_fdset); 

    timeout.tv_sec = wait_seconds; 
    timeout.tv_usec =  0 ; 

     do 
    { 
        ret = select(fd +  1 , &accept_fdset,  NULL ,  NULL , &timeout); 
    } 
     while  (ret <  0  && errno == EINTR); 

     if  (ret == - 1 ) 
         return  - 1 ; 
     else   if  (ret ==  0 ) 
    { 
        errno = ETIMEDOUT; 
         return  - 1 ; 
    } 
} 

 if  (addr !=  NULL ) 
    ret = accept(fd, ( struct  sockaddr *)addr, &addrlen); 
 else 
    ret = accept(fd,  NULL ,  NULL ); 
 if  (ret == - 1 ) 
    ERR_EXIT( "accpet error" ); 

 return  ret
  • connect__timeout 函数封装
/* activate_nonblock - 设置IO为非阻塞模式 
* fd: 文件描述符 
*/ 
void activate_nonblock( int fd) 
{ 
int ret; 
int flags = fcntl(fd, F_GETFL); 
if (flags == - 1 ) 
ERR_EXIT( “fcntl error” ); 
flags |= O_NONBLOCK; 
ret = fcntl(fd, F_SETFL, flags); 
 if  (ret == - 1 ) 
    ERR_EXIT( "fcntl error" ); 
}

/* deactivate_nonblock - 设置IO为阻塞模式 
* fd: 文件描述符 
*/ 
void deactivate_nonblock( int fd) 
{ 
int ret; 
int flags = fcntl(fd, F_GETFL); 
if (flags == - 1 ) 
ERR_EXIT( “fcntl error” ); 
flags &= ~O_NONBLOCK; 
ret = fcntl(fd, F_SETFL, flags); 
 if  (ret == - 1 ) 
    ERR_EXIT( "fcntl error" ); 
}


/* connect_timeout - 带超时的connect 
* fd: 套接字 
* addr: 输出参数,返回对方地址 
* wait_seconds: 等待超时秒数,如果为0表示正常模式 
* 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT 
*/ 
int connect_timeout( int fd, struct sockaddr_in *addr, unsigned int wait_seconds) 
{ 
int ret; 
socklen_t addrlen = sizeof ( struct sockaddr_in); 
 if  (wait_seconds >  0 ) 
    activate_nonblock(fd); 

ret = connect(fd, ( struct  sockaddr *)addr, addrlen); 
 if  (ret <  0  && errno == EINPROGRESS) 
{ 

    fd_set connect_fdset; 
     struct  timeval timeout; 
    FD_ZERO(&connect_fdset); 
    FD_SET(fd, &connect_fdset); 

    timeout.tv_sec = wait_seconds; 
    timeout.tv_usec =  0 ; 

     do 
    { 
         /* 一旦连接建立,套接字就可写 */ 
        ret = select(fd +  1 ,  NULL , &connect_fdset,  NULL , &timeout); 
    } 
     while  (ret <  0  && errno == EINTR); 

     if  (ret ==  0 ) 
    { 
        errno = ETIMEDOUT; 
         return  - 1 ; 
    } 
     else   if  (ret <  0 ) 
         return  - 1 ; 

     else   if  (ret ==  1 ) 
    { 
         /* ret返回为1,可能有两种情况,一种是连接建立成功,一种是套接字产生错误 
         * 此时错误信息不会保存至errno变量中(select没出错),因此,需要调用 
         * getsockopt来获取 */ 
         int  err; 
        socklen_t socklen =  sizeof (err); 
         int  sockoptret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &socklen); 
         if  (sockoptret == - 1 ) 
             return  - 1 ; 
         if  (err ==  0 ) 
            ret =  0 ; 
         else 
        { 
            errno = err; 
            ret = - 1 ; 
        } 
    } 
} 

 if  (wait_seconds >  0 ) 
    deactivate_nonblock(fd); 

 return  ret; 
}

猜你喜欢

转载自blog.csdn.net/buhuiguowang/article/details/81590624