c++ 通信演进level4 ----多线程异步非阻塞通信(AIO)

  今天,将最后一个流模型例子给记录一下,代码同样来自于网上。由于一些原因,导致心情不是很好,还是按照既定计划,将该demo的笔记记录一下。源码地址:地址

   它是基于  windows的iocp完成的,所以是异步非阻塞io。 最近看了很多的关于io说明的,各种帖子看的我头大,始终还是没彻底搞懂,估计是没接触过太多,境界还没到。但是毫无疑问的是,windows的iocp是属于异步非阻塞io的。 

  代码结构:

 服务端流程:

#include "main.h"
int main(){
    CServerSocket* serverSocket = new CServerSocket();
    serverSocket->prepareEnvironment();
    delete serverSocket;
}

  之前是想将里面的关键方法进行抽象来着,但是后面精力问题,就直接放到一个方法里面的了。 

  服务端关键代码:

......  
    //创建 IOCP的内核对象
    //参数说明:已经打开的文件句柄或者空句柄(一般是客户端的句柄),已经存在的IOCP句柄,完成键,真正并发同时执行最大线程数
    HANDLE  completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
......
  //基于处理器的核心数量创建线程
    for(DWORD i=0;i<(mySysInfo.dwNumberOfProcessors*2);++i){
        //创建服务器工作线程,并将完成端口传递到该线程
        HANDLE threadHandle = CreateThread(NULL,0,ServerWorkThread,completionPort,0,NULL);
        
        CloseHandle(threadHandle);
    }
......
int bindResult = bind(srvSocket,(SOCKADDR*)&srvAddr, sizeof(SOCKADDR));
......
 //将socket设置为监听模式
    int listenResult = listen(srvSocket,10);
......
//创建用于发送数据的线程
    HANDLE sendThread = CreateThread(NULL,0,ServerSendThread,0,0,NULL);
......

 主要作用是: 在程序启动之初,根据cpu的数量,创建相应的信息接收线程,信息接收线程要与一个完成端口进行绑定。  另外,启动一个信息发送线程,目前程序的实现是向所有连接的客户端发送信息。 但是也很容易做到向特定的 套接字客户端发送。 

  信息接收线程的核心代码如下:

 while(true){
        bRet = GetQueuedCompletionStatus(completionPort, &bytesTransferred, (PULONG_PTR)&perHandleData, (LPOVERLAPPED*)&ipOverLapped, INFINITE);

        LpperIOData perIoData=NULL;
        perIoData = (LpperIOData)CONTAINING_RECORD(ipOverLapped,PerIOData,overlapped);
......

  第二行中,是阻塞等待通知的。  它的通知机制是: 通过io完成端口,利用overlapped的结构。 其中,getQueuedCompletionStatus中,第一个参数为完成端口倒数第二个参数 为完成端口携带消息的介质。倒数带三个参数 相当于传递的消息类型,这里为包含socket的结构体。 

  此外要注意,重叠机制获取完数据后,需要设置重叠状态:

扫描二维码关注公众号,回复: 10657786 查看本文章
//为下一个重叠调用 建立单 I/O 操作数据
        perIoData = (LpperIOData)GlobalAlloc(GPTR, sizeof(LpperIOData));
        ZeroMemory(&(perIoData->overlapped), sizeof(OVERLAPPED)); //清空内存
        perIoData->databuff.len= 1024;
        perIoData->databuff.buf=perIoData->buffer;
        perIoData->operationType=0;//read;
        WSARecv(perHandleData->socket, &(perIoData->databuff), 1, (&recvBytes), &flags, &(perIoData->overlapped), NULL);

  代码中的顺序是不能乱序的,在我第一次写这个代码的时候,当时对c++指针,内存分配啥的还不是很清楚,结果导致这个项目中原来的代码运行不成功。  之后各种尝试,最后试成功了。 同时,当时遇到一个奇怪的现象,那就是运行是正常的,但是调试偶尔会出错。 现在想来,可能跟完成端口有关系。 

   我本地的cpu核心数是4核,所有在当前例子中,开了八个信息接收线程,这八个线程会同时阻塞在完成端口。根据我完成端口状态,一次只能允许一个并发,所以当完成端口有信号时,只会触发一个线程响应。 这个是可以控制的

   接着看看信息发送线程,比较简单:

//发送信息的线程执行函数
DWORD WINAPI ServerSendThread(LPVOID ipParam){
    while (1){
        char talk[200];
        gets(talk);
        if (talk ==""){
            return 0;
        }

        WaitForSingleObject(hMutex,INFINITE);
        for(int i=0;i<clientGroup.size();++i){
            send(clientGroup[i]->socket,talk,200,0);
        }
        ReleaseMutex(hMutex);
    }
    return 0;
}

  以上的信息接收时的打印,和 信息输入时的控制台录入,我最开始使用的时 std::cout,于std::cin的标准c++库函数。 但是经测试,并不能正确获取值。  当时网上查找说的是 ,它们不是线程安全的原因。  但是我现在想了下,可能恰恰因为它们是线程安全的才导致这样的问题。   控制台对一个程序来说,资源是唯一的,由于该资源被其它资源占用,所有导致当前使用线程不能正确的得到想要的结果。 

   客户端的代码也比较简单: 

int main(){
    CClientSocket* clientSocket= new CClientSocket();
    clientSocket->prepareEnvironment();
    delete clientSocket;
}

   同样的理由,并未对方法进行拆分。  

  HANDLE sendThread = CreateThread(NULL,0,SendMessageThread,NULL,0,NULL);
  HANDLE receiveThread = CreateThread(NULL,0,ReceiveMessageThread,NULL,0,NULL);

  此时客户端采用了 读写均单独开始一个线程,这样的好处在于,可以拜托类似于 http应用层协议那种 一收一发的固定通信模式。 当前,它也可支持原来的通信模式,这主要看使用者意愿。 

   发送线程代码:

DWORD WINAPI SendMessageThread(LPVOID ipParam){
    while (1){
        string  talk;
        getline(cin,talk);
        WaitForSingleObject(bufferMutex,INFINITE);
        if("quit"==talk){
            talk.push_back('\0');
            send(clientSocket,talk.c_str(),200,0);
            break;
        }
        std::cout<<"\nI Say:(\"quit\" to exit):"<<talk<<"\n";
        send(clientSocket,talk.c_str(),200,0);
        ReleaseSemaphore(bufferMutex,1,NULL);
        
        Sleep(5000);
    }
    return 0;
}

  接收线程代码:

DWORD WINAPI ReceiveMessageThread(LPVOID ipParam){
    while (1){
        char recvBuf[300];
        recv(clientSocket,recvBuf,200,0);
        WaitForSingleObject(bufferMutex,INFINITE);

        std::cout<<"Server Says:"<<recvBuf<<std::endl;

        ReleaseSemaphore(bufferMutex,1,NULL);
        if(recvBuf[0]=='\0'){
            cout<<"接收线程关闭!";
            break;
        }
    }
    return  0;
}

  注意到这两个线程,实际上是对线程退出时机做了预设的。 这个层面来说,它实际上便相当于一个特殊意义上的应用层协议了

运行效果:

发布了340 篇原创文章 · 获赞 159 · 访问量 13万+

猜你喜欢

转载自blog.csdn.net/qq_36285943/article/details/104806633