6.完成端口模型
这篇博文对完成端口模型介绍很详细:完成端口模型
完成端口(Completion Port)是一种Windows系统的内核对象,利用完成端口,套接字应用程序能够管理数百甚至上千个套接字,而且可以使系统的性能达到最佳。使用完成端口模型之前,需要首先创建一个I/O完成端口对象,再使用该完成端口对象,可以面向任意数量的套接字句柄,管理多个I/O请求;然后,通过指定一定数量的工作线程,为已经完成的重叠I/O提供服务。
完成端口模型使用线程池对线程进行管理,通过事先创建好的多个线程,在处理多个异步并发I/O请求时避免了频繁的线程创建和注销,在没有或I/O请求很少时,不使用的线程会挂起,也不会占用CPU时间。
可以把完成端口看成是系统维护的一个队列,系统把重叠I/O操作完成的事件通知放在该队列中。当某项重叠I/O操作完成时,系统将向完成端口对象发送I/O完成数据包,在收到一个数据包后,完成端口对象中的一个工作线程将被唤醒,处理完后,线程会继续在完成端口上等待后续的通知。
6.1实现完成端口模型的流程:
- 调用CreateIoCompletionPort()创建完成端口对象;
- 创建工作线程;
- 创建一个用于监听的套接字;
- 调用accept()或WASAccept()接受客户端的连接请求;
- 调用CreateIoCompletionPort()将第4步函数返回的已连接套接字与完成端口对象关联(绑定);
- 在已连接套接字上提交重叠I/O操作(如:WSASend()或WSARecv(),当重叠异步I/O操作完成时,将会调用对应的工作者线程。
区分两个东西:1.一个是用于区分不同套接字连接的单句柄数据结构!PER_SOCKET_CONTEXT(每个socket的上下文环境)
每个已连接套接字都有自己的单句柄数据结构!
//==================================================================================== // // 单句柄数据结构体定义(用于每一个完成端口,也就是每一个Socket的参数) // //==================================================================================== typedef struct _PER_SOCKET_CONTEXT { SOCKET m_Socket; // 每一个客户端连接的Socket SOCKADDR_IN m_ClientAddr; // 客户端的地址 CArray<_PER_IO_CONTEXT*> m_arrayIoContext; // 客户端网络操作的上下文数据, // 也就是说对于每一个客户端Socket,是可以在上面同时投递多个IO请求的 // 初始化 _PER_SOCKET_CONTEXT() { m_Socket = INVALID_SOCKET; memset(&m_ClientAddr, 0, sizeof(m_ClientAddr)); } // 释放资源 ~_PER_SOCKET_CONTEXT() { if( m_Socket!=INVALID_SOCKET ) { closesocket( m_Socket ); m_Socket = INVALID_SOCKET; } // 释放掉所有的IO上下文数据 for( int i=0;i<m_arrayIoContext.GetCount();i++ ) { delete m_arrayIoContext.GetAt(i); } m_arrayIoContext.RemoveAll(); } // 获取一个新的IoContext _PER_IO_CONTEXT* GetNewIoContext() { _PER_IO_CONTEXT* p = new _PER_IO_CONTEXT; m_arrayIoContext.Add( p ); return p; } // 从数组中移除一个指定的IoContext void RemoveContext( _PER_IO_CONTEXT* pContext ) { ASSERT( pContext!=NULL ); for( int i=0;i<m_arrayIoContext.GetCount();i++ ) { if( pContext==m_arrayIoContext.GetAt(i) ) { delete pContext; pContext = NULL; m_arrayIoContext.RemoveAt(i); break; } } } } PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;
2.是在每一个已连接套接字上用于区分多个不同的I/O操作的单I/O数据结构PER_IO_CONTEXT(每次I/O的上下文环境)
每次I/O操作类型的枚举类型:
////////////////////////////////////////////////////////////////// // 在完成端口上投递的I/O操作的类型 typedef enum _OPERATION_TYPE { ACCEPT_POSTED, // 标志投递的Accept操作 SEND_POSTED, // 标志投递的是发送操作 RECV_POSTED, // 标志投递的是接收操作 NULL_POSTED // 用于初始化,无意义 }OPERATION_TYPE;
//==================================================================================== // // 单IO数据结构体定义(用于每一个重叠操作的参数) // //==================================================================================== typedef struct _PER_IO_CONTEXT { OVERLAPPED m_Overlapped; // 每一个重叠网络操作的重叠结构(针对每一个Socket的每一个操作,都要有一个) SOCKET m_sockAccept; // 这个网络操作所使用的Socket WSABUF m_wsaBuf; // WSA类型的缓冲区,用于给重叠操作传参数的 char m_szBuffer[MAX_BUFFER_LEN]; // 这个是WSABUF里具体存字符的缓冲区 OPERATION_TYPE m_OpType; // 标识网络操作的类型(对应上面的枚举) // 初始化 _PER_IO_CONTEXT() { ZeroMemory(&m_Overlapped, sizeof(m_Overlapped)); ZeroMemory( m_szBuffer,MAX_BUFFER_LEN ); m_sockAccept = INVALID_SOCKET; m_wsaBuf.buf = m_szBuffer; m_wsaBuf.len = MAX_BUFFER_LEN; m_OpType = NULL_POSTED; } // 释放掉Socket ~_PER_IO_CONTEXT() { if( m_sockAccept!=INVALID_SOCKET ) { closesocket(m_sockAccept); } } // 重置缓冲区内容 void ResetBuffer() { ZeroMemory( m_szBuffer,MAX_BUFFER_LEN ); } } PER_IO_CONTEXT, *PPER_IO_CONTEXT;
另外,重叠I/O的核心数据结构是:OVERLAPPED结构:(用来标识每次重叠I/O操作的!)
typedef struct _OVERLAPPED { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { struct { DWORD Offset; DWORD OffsetHigh; } DUMMYSTRUCTNAME; PVOID Pointer; } DUMMYUNIONNAME; HANDLE hEvent; } OVERLAPPED, *LPOVERLAPPED;
WASOVERLAPPED结构完全与OVERLAPPED结构完全相同:
#define WSAOVERLAPPED OVERLAPPED
先来看一个简单 的例子,再去看这篇博文对完成端口模型介绍很详细:完成端口模型里面封装得很好,稍有些复杂的例子:
例子1:链接:https://pan.baidu.com/s/1YSz5HWL2kWVL7fg1Ax-rEA 密码:y9lh
服务器端代码:
#include "stdafx.h" #include <winsock2.h> #define PORT 65432 //定义端口号常量 #define MSGSIZE 1000 //接收缓冲区大小 #pragma comment(lib, "ws2_32.lib") typedef struct _PerHandle { //该结构体用于保存套接字信息,以区分不同的套接字!//这个起始就类似于单句柄数据!PER_SOCKET_CONTEXT SOCKET m_sock; sockaddr_in m_addr; }PerHandle, *PtrPerHandle; /*在已连接套接字上提交一个重叠I/O操作,例如WSARecv(),此后工作线程将会启动进行相关处理。 在应用程序中通常声明一个核I/O操作相关的结构体,用以保存每一次I/O操作相关的信息! 该结构体参照WSAOVERLAPPED结构体! typedef struct _OVERLAPPED { ULONG_PTR Internal; ULONG_PTR InternalHigh; union { struct { DWORD Offset; DWORD OffsetHigh; } DUMMYSTRUCTNAME; PVOID Pointer; } DUMMYUNIONNAME; HANDLE hEvent; } OVERLAPPED, *LPOVERLAPPED; #define WSAOVERLAPPED OVERLAPPED */ //这个结构体就是保存每次I/O操作的信息,叫per_io_operation_data,//就是单IO数据结构信息! typedef struct _PerIO { WSAOVERLAPPED m_overlapped; //重叠I/O使用的OVERLAPPED结构体 char buf[512]; int m_operationType;//操作类型就是下面的1,2,3取值类型! #define OP_READ 1 #define OP_WRITE 2 #define OP_ACCEPT 3 }PerIO, *PtrPerIO; #include <iostream> #include <windows.h> #include <process.h> using namespace std; UINT WINAPI ServerThread(PVOID pvParam); int main(int argc, char **argv) { WSADATA wsaData; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { cout << "failed to load winsock !" << endl; return 0; } /*创建完成端口对象*/ HANDLE hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); /*创建服务线程,构建服务线程池*/ _beginthreadex(NULL, 0, ServerThread, (LPVOID)hCompletion, 0, 0);////hCompletion作为参数传递给线程函数! /*创建监听套接字,并监听连接*/ SOCKET myListen = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in localAddr; localAddr.sin_family = AF_INET; localAddr.sin_port = ntohs(65432); localAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);//可以使用本机的任何IP地址 bind(myListen, (sockaddr *)&localAddr, sizeof(localAddr)); listen(myListen, 5);//backlog=5:表示等待连接的最长队列! cout << "server is listening......" << endl; /*接受客户端的连接,并将读写操作投放到重叠IO中*/ //针对一个客户端的连接,仅发送和接受各一次信息! while (true) { struct sockaddr_in clientAddr; int clientAddrLen = sizeof(clientAddr); //accept是阻塞版本的!,循环服务器,每来一个连接的时候现场花时间建立一个新的socket(accept返回该socket)! SOCKET newClient = accept(myListen, (sockaddr *)&clientAddr, &clientAddrLen); /* 要注意区分监听套接字、和已连接套接字!,这里仅将已连接套接字以完成端口进行了绑定! 所以,只有经该已连接套接字收发的I/O操作会触发完成端口发送发出消息,然后调用线程函数来处理I/O操作完成后的事务! */ /*将套接字与完成端口关联*/ PtrPerHandle ptrPerHandle = new PerHandle(); ptrPerHandle->m_sock = newClient; ptrPerHandle->m_addr = clientAddr; CreateIoCompletionPort((HANDLE)ptrPerHandle->m_sock, hCompletion, (ULONG_PTR)ptrPerHandle, 0); //WSASend,调用了几次就发送几次信息!,这里针对一个连接仅调用了一次! /*投放异步重叠IO1:发生数据给客户端*/ PtrPerIO ptrPerIO1 = new PerIO(); strcpy(ptrPerIO1->buf, "message send to client from server!"); ptrPerIO1->m_operationType = OP_WRITE; WSABUF buf1; buf1.buf = ptrPerIO1->buf; buf1.len = 512; DWORD dwSent; DWORD dwFlag1 = 0; ::WSASend(ptrPerHandle->m_sock, &buf1, 1, &dwSent, dwFlag1, &ptrPerIO1->m_overlapped, NULL); //WSARecv()调用了几次就收几次信息!,这个while循环一次循环仅针对一个连接请求,仅发送一次信息和接受一次来自客户端的信息! /*投放异步重叠IO2:从客户端接收数据!*/ PtrPerIO ptrPerIO = new PerIO(); ptrPerIO->m_operationType = OP_READ; WSABUF buf; buf.buf = ptrPerIO->buf; buf.len = 512; DWORD dwRecv=0; DWORD dwFlag = 0; int nBytesRecv=::WSARecv(ptrPerHandle->m_sock, &buf, 1, &dwRecv, &dwFlag, &ptrPerIO->m_overlapped, NULL); if (nBytesRecv == SOCKET_ERROR) { int errorCode = GetLastError(); if (errorCode == WSA_IO_PENDING) { cout <<"in main function"<< "数据暂时还没收到!,等待通知!" << endl;//这里只会执行一次啊!,因为accept是阻塞版本啊,来第二的客户端请求的时候,这里就能执行第二次啊! /*server is listening...... message send to client from server! 数据暂时还没收到!,等1秒钟!in thread function! message send to client from server! in thread function! 数据暂时还没收到!,等1秒钟! 来自第一个客户端的消息! 来自第二个客户端的消息! */ } } if (nBytesRecv==0&&dwRecv > 0) { cout << "buf.buf=" << buf.buf << endl; } /* #if INCL_WINSOCK_API_PROTOTYPES WINSOCK_API_LINKAGE int WSAAPI WSARecv( _In_ SOCKET s, _In_reads_(dwBufferCount) __out_data_source(NETWORK) LPWSABUF lpBuffers, _In_ DWORD dwBufferCount, _Out_opt_ LPDWORD lpNumberOfBytesRecvd, _Inout_ LPDWORD lpFlags, _Inout_opt_ LPWSAOVERLAPPED lpOverlapped, _In_opt_ LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine ); #endif //INCL_WINSOCK_API_PROTOTYPES */ } } /* 这些工作线程(服务线程)就是用来处理各种操作系统的通知的,比如有新客户端连接成功了、数据收好了、数据发送好了等等。 */ /******************************************************************************** * 函数介绍:服务线程函数,平常处于等待状态,当IO操作完成后,完成数据处理。 * 输入参数: PVOID pvParam:完成端口对象。 * 输出参数:无 * 返回值 :用于符合线程函数的返回值要求,成功返回0,否则返回-1。 *********************************************************************************/ UINT WINAPI ServerThread(PVOID pvParam) { HANDLE hCompletion = (HANDLE)pvParam; DWORD dwTrans; PtrPerHandle ptrPerHandle;//保存套接字信息,以区分不同的套接字! PtrPerIO ptrPerIO;//保存每次I/O操作的信息,以区分不同的I/O操作 while (true) { /* WINBASEAPI BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytesTransferred, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED * lpOverlapped, _In_ DWORD dwMilliseconds /若为WSA_INFINITE,函数将一直阻塞,直到有完成通知到来! ); */ /* 在没有事件发生时,函数GetQueuedCompletionStatus()会让工作线程挂起,不然不会占用cpu时间片。 如果一直没有请求到来,就要考虑怎么关闭阻塞的工作线程了! */ /*阻塞线程直到有IO操作完成,并通过参数返回操作结果。*/ bool ret = ::GetQueuedCompletionStatus(hCompletion, &dwTrans, (LPDWORD)&ptrPerHandle, (LPOVERLAPPED *)&ptrPerIO, WSA_INFINITE); if (!ret) { closesocket(ptrPerHandle->m_sock); delete(ptrPerHandle); delete(ptrPerIO); continue; } /*读或写数据为空*/ if (dwTrans == 0 && (ptrPerIO->m_operationType == OP_READ || ptrPerIO->m_operationType == OP_WRITE)) { closesocket(ptrPerHandle->m_sock); delete(ptrPerHandle); delete(ptrPerIO); continue; } switch (ptrPerIO->m_operationType) { case OP_READ: { cout <<"in thread function!"<< "客户端发送来的数据已经收到,现在在线程函数里!" << endl; ptrPerIO->buf[dwTrans] = '/0';//dwTrans=512,就等于buffer的字节数,理论上是数组越界了! ptrPerIO->buf; cout <<"in thread function!"<< ptrPerIO->buf << endl; //WSABUF buf; //buf.buf = ptrPerIO->buf; //buf.len = 512; //DWORD dwRecv; //DWORD dwFlag = 0; //::WSARecv(ptrPerHandle->m_sock, &buf, 1, &dwRecv, &dwFlag, &ptrPerIO->m_overlapped, NULL); } break; case OP_WRITE: { cout << "in thread function!"<<"服务器发送客户端的数据已经发送,现在在线程函数里!" << endl; ptrPerIO->buf[dwTrans] = '/0';//dwTrans=512,就等于buffer的字节数,理论上是数组越界了! //ptrPerIO->buf; cout << "in thread function!"<< ptrPerIO->buf << endl; //WSABUF buf; //buf.buf = "message send to client from server!,and in server thread!";//这里就发不了了!,只有在主线程main函数里调用WSASend()函数完成数据发送以后,才会调用该子线程函数! //buf.len = sizeof(buf.buf); //DWORD dwSent = sizeof(buf.buf); //DWORD dwFlag = 0; //::WSASend(ptrPerHandle->m_sock, &buf, 1, &dwSent, dwFlag, &ptrPerIO->m_overlapped, NULL); } break; case OP_ACCEPT: { cout << "接受了一个连接请求,现在在线程函数里面!" << endl; } break; } } return 0; } /* server is listening...... in thread function!服务器发送客户端的数据已经发送,现在在线程函数里! in main function数据暂时还没收到!,等待通知! in thread function!message send to client from server! in thread function!客户端发送来的数据已经收到,现在在线程函数里! in thread function!你好! */
客户端:就普通的阻塞式套接字:这里对发送数据搞了一个循环,然而服务器端并不会接受多次数据,服务器端针对一个客户端套接字连接,仅收一次数据;但可以启动多个客户端程序,对于每一个客户端,都可以收发一次数据!
// client.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "iostream" #include "winsock2.h" #define PORT 65432 //定义要访问的服务器端口常量 #pragma comment(lib, "ws2_32.lib") using namespace std; int main(int argc, char **argv) { /***定义相关的变量***/ int sock_client; //定义客户端套接字 struct sockaddr_in server_addr; //定义存放服务器端地址的结构变量 int addr_len = sizeof(struct sockaddr_in); //地址结构变量长度 char msgbuffer[1000]; //接收/发送信息的缓冲区 /***初始化winsock DLL***/ WSADATA wsaData; WORD wVersionRequested=MAKEWORD(2,2); //生成版本号2.2 if(WSAStartup(wVersionRequested,&wsaData)!=0) { cout<<"加载winsock.dll失败!\n"; return 0; } /***创建套接字***/ if ((sock_client = socket(AF_INET,SOCK_STREAM,0))<0) { cout<<"创建套接字失败!错误代码:"<<WSAGetLastError()<<endl; WSACleanup(); return 0; } /***填写服务器地址***/ char IP[20]="127.0.0.1"; //cout<<"请输入服务器IP地址:"; //cin>>IP; memset((void *)&server_addr,0,addr_len);//地址结构清0 server_addr.sin_family =AF_INET; server_addr.sin_port = htons(PORT); server_addr.sin_addr.s_addr = inet_addr(IP);//填写服务器IP地址 /***与服务器建立连接***/ if(connect(sock_client,(struct sockaddr *)&server_addr,addr_len)!=0) { cout<<"连接失败!错误代码:"<<WSAGetLastError()<<endl; closesocket(sock_client); WSACleanup(); return 0; } ///***接收信息并显示***/ int size; if((size=recv(sock_client,msgbuffer,sizeof(msgbuffer),0))<0) { cout<<"接收信息失败!错误代码:"<<WSAGetLastError()<<endl; closesocket(sock_client);//关闭已连接套接字 WSACleanup(); //注销WinSock动态链接库 return 0; } else if(size==0) { cout<<"对方已关闭连接!\n"; closesocket(sock_client);//关闭已连接套接字 WSACleanup(); //注销WinSock动态链接库 return 0; } else cout<<"The message from Server: "<<msgbuffer<<endl; while (1) { /***从键盘输入一行文字发送给服务器***/ cout << "从键盘输入发给服务器的信息!\n"; cin >> msgbuffer; //输入hello,打断点,查看msgbuffer数组每个元素,就有msgbuffer[5]='\0' ,而用strlen求字符串长度时,会自动忽略字符串结尾符'\0',只计算实际输入的字符数! //输入你好,打断点,查看msgbuffer数组每个元素,就有msgbuffer[4]='\0' int sizeofmsgbuffer = sizeof(msgbuffer);//1000 int strlenmsgbuffer = strlen(msgbuffer);//5 if ((size = send(sock_client, msgbuffer, sizeof(msgbuffer), 0))<0) cout << "发送信息失败!错误代码:" << WSAGetLastError() << endl; else if (size == 0) cout << "对方已关闭连接!\n"; else cout << "信息发送成功!\n"; } /***结束处理***/ closesocket(sock_client); //关闭socket WSACleanup(); //注销WinSock动态链接库 system("pause"); return 0; } /* The message from Server: message send to client from server! 从键盘输入发给服务器的信息! 你好! 信息发送成功! 从键盘输入发给服务器的信息! */
例子2:参考:完成端口模型
服务器端:
IOCPModel.h
/* ========================================================================== Purpose: * 这个类CIOCPModel是本代码的核心类,用于说明WinSock服务器端编程模型中的 完成端口(IOCP)的使用方法,并使用MFC对话框程序来调用这个类实现了基本的 服务器网络通信的功能。 * 其中的PER_IO_DATA结构体是封装了用于每一个重叠操作的参数 PER_HANDLE_DATA 是封装了用于每一个Socket的参数,也就是用于每一个完成端口的参数 * 详细的文档说明请参考 http://blog.csdn.net/PiggyXP Notes: * 具体讲明了服务器端建立完成端口、建立工作者线程、投递Recv请求、投递Accept请求的方法, 所有的客户端连入的Socket都需要绑定到IOCP上,所有从客户端发来的数据,都会实时显示到 主界面中去。 Author: * PiggyXP【小猪】 Date: * 2009/10/04 ========================================================================== */ #pragma once // winsock 2 的头文件和库 #include <winsock2.h> #include <MSWSock.h> #pragma comment(lib,"ws2_32.lib") // 缓冲区长度 (1024*8) // 之所以为什么设置8K,也是一个江湖上的经验值 // 如果确实客户端发来的每组数据都比较少,那么就设置得小一些,省内存 #define MAX_BUFFER_LEN 8192 // 默认端口 #define DEFAULT_PORT 12345 // 默认IP地址 #define DEFAULT_IP _T("127.0.0.1") ////////////////////////////////////////////////////////////////// // 在完成端口上投递的I/O操作的类型 typedef enum _OPERATION_TYPE { ACCEPT_POSTED, // 标志投递的Accept操作 SEND_POSTED, // 标志投递的是发送操作 RECV_POSTED, // 标志投递的是接收操作 NULL_POSTED // 用于初始化,无意义 }OPERATION_TYPE; //==================================================================================== // // 单IO数据结构体定义(用于每一个重叠操作的参数) // //==================================================================================== typedef struct _PER_IO_CONTEXT { OVERLAPPED m_Overlapped; // 每一个重叠网络操作的重叠结构(针对每一个Socket的每一个操作,都要有一个) SOCKET m_sockAccept; // 这个网络操作所使用的Socket WSABUF m_wsaBuf; // WSA类型的缓冲区,用于给重叠操作传参数的 char m_szBuffer[MAX_BUFFER_LEN]; // 这个是WSABUF里具体存字符的缓冲区 OPERATION_TYPE m_OpType; // 标识网络操作的类型(对应上面的枚举) // 初始化 _PER_IO_CONTEXT() { ZeroMemory(&m_Overlapped, sizeof(m_Overlapped)); ZeroMemory( m_szBuffer,MAX_BUFFER_LEN ); m_sockAccept = INVALID_SOCKET; m_wsaBuf.buf = m_szBuffer; m_wsaBuf.len = MAX_BUFFER_LEN; m_OpType = NULL_POSTED; } // 释放掉Socket ~_PER_IO_CONTEXT() { if( m_sockAccept!=INVALID_SOCKET ) { closesocket(m_sockAccept); m_sockAccept = INVALID_SOCKET; } } // 重置缓冲区内容 void ResetBuffer() { ZeroMemory( m_szBuffer,MAX_BUFFER_LEN ); } } PER_IO_CONTEXT, *PPER_IO_CONTEXT; //==================================================================================== // // 单句柄数据结构体定义(用于每一个完成端口,也就是每一个Socket的参数) // //==================================================================================== typedef struct _PER_SOCKET_CONTEXT { SOCKET m_Socket; // 每一个客户端连接的Socket SOCKADDR_IN m_ClientAddr; // 客户端的地址 CArray<_PER_IO_CONTEXT*> m_arrayIoContext; // 客户端网络操作的上下文数据, // 也就是说对于每一个客户端Socket,是可以在上面同时投递多个IO请求的 // 初始化 _PER_SOCKET_CONTEXT() { m_Socket = INVALID_SOCKET; memset(&m_ClientAddr, 0, sizeof(m_ClientAddr)); } // 释放资源 ~_PER_SOCKET_CONTEXT() { if( m_Socket!=INVALID_SOCKET ) { closesocket( m_Socket ); m_Socket = INVALID_SOCKET; } // 释放掉所有的IO上下文数据 for( int i=0;i<m_arrayIoContext.GetCount();i++ ) { delete m_arrayIoContext.GetAt(i); } m_arrayIoContext.RemoveAll(); } // 获取一个新的IoContext _PER_IO_CONTEXT* GetNewIoContext() { _PER_IO_CONTEXT* p = new _PER_IO_CONTEXT; m_arrayIoContext.Add( p ); return p; } // 从数组中移除一个指定的IoContext void RemoveContext( _PER_IO_CONTEXT* pContext ) { ASSERT( pContext!=NULL ); for( int i=0;i<m_arrayIoContext.GetCount();i++ ) { if( pContext==m_arrayIoContext.GetAt(i) ) { delete pContext; pContext = NULL; m_arrayIoContext.RemoveAt(i); break; } } } } PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT; //==================================================================================== // // CIOCPModel类定义 // //==================================================================================== // 工作者线程的线程参数 class CIOCPModel;//类声明! typedef struct _tagThreadParams_WORKER { CIOCPModel* pIOCPModel; // 类指针,用于调用类中的函数 int nThreadNo; // 线程编号 } THREADPARAMS_WORKER,*PTHREADPARAM_WORKER; // CIOCPModel类 class CIOCPModel { public: CIOCPModel(void); ~CIOCPModel(void); public: // 启动服务器 bool Start(); // 停止服务器 void Stop(); // 加载Socket库 bool LoadSocketLib(); // 卸载Socket库,彻底完事 void UnloadSocketLib() { WSACleanup(); } // 获得本机的IP地址 CString GetLocalIP(); // 设置监听端口 void SetPort( const int& nPort ) { m_nPort=nPort; } // 设置主界面的指针,用于调用显示信息到界面中 void SetMainDlg( CDialog* p ) { m_pMain=p; } protected: // 初始化IOCP bool _InitializeIOCP(); // 初始化Socket bool _InitializeListenSocket(); // 最后释放资源 void _DeInitialize(); // 投递Accept请求 bool _PostAccept( PER_IO_CONTEXT* pAcceptIoContext ); // 投递接收数据请求 bool _PostRecv( PER_IO_CONTEXT* pIoContext ); // 在有客户端连入的时候,进行处理 bool _DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext ); // 在有接收的数据到达的时候,进行处理 bool _DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext ); // 将客户端的相关信息存储到数组中 void _AddToContextList( PER_SOCKET_CONTEXT *pSocketContext ); // 将客户端的信息从数组中移除 void _RemoveContext( PER_SOCKET_CONTEXT *pSocketContext ); // 清空客户端信息 void _ClearContextList(); // 将句柄绑定到完成端口中 bool _AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext); // 处理完成端口上的错误 bool HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr ); // 线程函数,为IOCP请求服务的工作者线程 static DWORD WINAPI _WorkerThread(LPVOID lpParam); // 获得本机的处理器数量 int _GetNoOfProcessors(); // 判断客户端Socket是否已经断开 bool _IsSocketAlive(SOCKET s); // 在主界面中显示信息 void _ShowMessage( const CString szFormat,...) const; private: HANDLE m_hShutdownEvent; // 用来通知线程系统退出的事件,为了能够更好的退出线程 HANDLE m_hIOCompletionPort; // 完成端口的句柄 HANDLE* m_phWorkerThreads; // 工作者线程的句柄指针 int m_nThreads; // 生成的线程数量(CPU核心数*2) CString m_strIP; // 服务器端的IP地址 int m_nPort; // 服务器端的监听端口 CDialog* m_pMain; // 主界面的界面指针,用于在主界面中显示消息 CRITICAL_SECTION m_csContextList; // 用于Worker线程同步的互斥量(临界区) CArray<PER_SOCKET_CONTEXT*> m_arrayClientContext; // 客户端Socket的Context信息 PER_SOCKET_CONTEXT* m_pListenContext; // 用于监听的Socket的Context信息 LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx 和 GetAcceptExSockaddrs 的函数指针,用于调用这两个扩展函数 LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs; };
IOCPModel.cpp:
#include "StdAfx.h" #include "IOCPModel.h" #include "MainDlg.h" // 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档) #define WORKER_THREADS_PER_PROCESSOR 2 // 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置) #define MAX_POST_ACCEPT 10 // 传递给Worker线程的退出信号 #define EXIT_CODE NULL // 释放指针和句柄资源的宏 // 释放指针宏 #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}} // 释放句柄宏 #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}} // 释放Socket宏 #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}} CIOCPModel::CIOCPModel(void): m_nThreads(0), m_hShutdownEvent(NULL), m_hIOCompletionPort(NULL), m_phWorkerThreads(NULL), m_strIP(DEFAULT_IP), m_nPort(DEFAULT_PORT), m_pMain(NULL), m_lpfnAcceptEx( NULL ), m_pListenContext( NULL ) { } CIOCPModel::~CIOCPModel(void) { // 确保资源彻底释放 this->Stop(); } /////////////////////////////////////////////////////////////////// // 工作者线程: 为IOCP请求服务的工作者线程 // 也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程 /////////////////////////////////////////////////////////////////// DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam) { THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam; CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel; int nThreadNo = (int)pParam->nThreadNo; pIOCPModel->_ShowMessage("工作者线程启动,ID: %d.",nThreadNo); OVERLAPPED *pOverlapped = NULL; PER_SOCKET_CONTEXT *pSocketContext = NULL; DWORD dwBytesTransfered = 0; // 循环处理请求,知道接收到Shutdown信息为止 while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0)) { BOOL bReturn = GetQueuedCompletionStatus( pIOCPModel->m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE); // 如果收到的是退出标志,则直接退出 if ( EXIT_CODE==(DWORD)pSocketContext ) { break; } // 判断是否出现了错误 if( !bReturn ) { DWORD dwErr = GetLastError(); // 显示一下提示信息 if( !pIOCPModel->HandleError( pSocketContext,dwErr ) ) { break; } continue; } else { // 读取传入的参数 PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped); // 判断是否有客户端断开了 if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType)) { pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),inet_ntoa(pSocketContext->m_ClientAddr.sin_addr), ntohs(pSocketContext->m_ClientAddr.sin_port) ); // 释放掉对应的资源 pIOCPModel->_RemoveContext( pSocketContext ); continue; } else { switch( pIoContext->m_OpType ) { // Accept case ACCEPT_POSTED: { // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求 pIOCPModel->_DoAccpet( pSocketContext, pIoContext ); } break; // RECV case RECV_POSTED: { // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求 pIOCPModel->_DoRecv( pSocketContext,pIoContext ); } break; // SEND // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些 case SEND_POSTED: { } break; default: // 不应该执行到这里 TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n")); break; } //switch }//if }//if }//while TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo); // 释放线程参数 RELEASE(lpParam); return 0; } //==================================================================================== // // 系统初始化和终止 // //==================================================================================== //////////////////////////////////////////////////////////////////// // 初始化WinSock 2.2 bool CIOCPModel::LoadSocketLib() { WSADATA wsaData; int nResult; nResult = WSAStartup(MAKEWORD(2,2), &wsaData); // 错误(一般都不可能出现) if (NO_ERROR != nResult) { this->_ShowMessage(_T("初始化WinSock 2.2失败!\n")); return false; } return true; } ////////////////////////////////////////////////////////////////// // 启动服务器 bool CIOCPModel::Start() { // 初始化线程互斥量 InitializeCriticalSection(&m_csContextList); // 建立系统退出的事件通知 m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL); /*CreateEvent是一个Windows API函数。它用来创建或打开一个命名的或无名的事件对象。如果想为对象指定一个访问掩码,应当使用CreateEventEx函数。 WINBASEAPI _Ret_maybenull_ HANDLE WINAPI CreateEventA( _In_opt_ LPSECURITY_ATTRIBUTES lpEventAttributes, _In_ BOOL bManualReset, _In_ BOOL bInitialState, _In_opt_ LPCSTR lpName ); WINBASEAPI _Ret_maybenull_ HANDLE WINAPI CreateEventW( _In_opt_ LPSECURITY_ATTRIBUTES lpEventAttributes, _In_ BOOL bManualReset, _In_ BOOL bInitialState, _In_opt_ LPCWSTR lpName ); #ifdef UNICODE #define CreateEvent CreateEventW #else #define CreateEvent CreateEventA #endif // !UNICODE */ // 初始化IOCP if (false == _InitializeIOCP()) { this->_ShowMessage(_T("初始化IOCP失败!\n")); return false; } else { this->_ShowMessage("\nIOCP初始化完毕\n."); } // 初始化Socket if( false==_InitializeListenSocket() ) { this->_ShowMessage(_T("Listen Socket初始化失败!\n")); this->_DeInitialize(); return false; } else { this->_ShowMessage("Listen Socket初始化完毕."); } this->_ShowMessage(_T("系统准备就绪,等候连接....\n")); return true; } //////////////////////////////////////////////////////////////////// // 开始发送系统退出消息,退出完成端口和线程资源 void CIOCPModel::Stop() { if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET ) { // 激活关闭消息通知 SetEvent(m_hShutdownEvent); for (int i = 0; i < m_nThreads; i++) { // 通知所有的完成端口操作退出 PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL); } // 等待所有的客户端资源退出 WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE); // 清除客户端列表信息 this->_ClearContextList(); // 释放其他资源 this->_DeInitialize(); this->_ShowMessage("停止监听\n"); } } //////////////////////////////// // 初始化完成端口 bool CIOCPModel::_InitializeIOCP() { // 建立第一个完成端口 m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( NULL == m_hIOCompletionPort) { this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError()); return false; } // 根据本机中的处理器数量,建立对应的线程数 m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors(); // 为工作者线程初始化句柄 m_phWorkerThreads = new HANDLE[m_nThreads]; // 根据计算出来的数量建立工作者线程 DWORD nThreadID; for (int i = 0; i < m_nThreads; i++) { THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER; pThreadParams->pIOCPModel = this; pThreadParams->nThreadNo = i+1; m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID); } TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads ); return true; } ///////////////////////////////////////////////////////////////// // 初始化Socket bool CIOCPModel::_InitializeListenSocket() { // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针 GUID GuidAcceptEx = WSAID_ACCEPTEX; GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; // 服务器地址信息,用于绑定Socket struct sockaddr_in ServerAddress; // 生成用于监听的Socket的信息 m_pListenContext = new PER_SOCKET_CONTEXT; // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作 m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == m_pListenContext->m_Socket) { this->_ShowMessage("初始化Socket失败,错误代码: %d.\n", WSAGetLastError()); return false; } else { TRACE("WSASocket() 完成.\n"); } // 将Listen Socket绑定至完成端口中 if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0)) { this->_ShowMessage("绑定 Listen Socket至完成端口失败!错误代码: %d/n", WSAGetLastError()); RELEASE_SOCKET( m_pListenContext->m_Socket ); return false; } else { TRACE("Listen Socket绑定完成端口 完成.\n"); } // 填充地址信息 ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress)); ServerAddress.sin_family = AF_INET; // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址 //ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY); ServerAddress.sin_addr.s_addr = inet_addr(m_strIP.GetString()); ServerAddress.sin_port = htons(m_nPort); // 绑定地址和端口 if (SOCKET_ERROR == bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress))) { this->_ShowMessage("bind()函数执行错误.\n"); return false; } else { TRACE("bind() 完成.\n"); } // 开始进行监听 if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN)) { this->_ShowMessage("Listen()函数执行出现错误.\n"); return false; } else { TRACE("Listen() 完成.\n"); } // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数 // 所以需要额外获取一下函数的指针, // 获取AcceptEx函数指针 DWORD dwBytes = 0; if(SOCKET_ERROR == WSAIoctl( m_pListenContext->m_Socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), &dwBytes, NULL, NULL)) { this->_ShowMessage("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n", WSAGetLastError()); this->_DeInitialize(); return false; } // 获取GetAcceptExSockAddrs函数指针,也是同理 if(SOCKET_ERROR == WSAIoctl( m_pListenContext->m_Socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs, sizeof(GuidGetAcceptExSockAddrs), &m_lpfnGetAcceptExSockAddrs, sizeof(m_lpfnGetAcceptExSockAddrs), &dwBytes, NULL, NULL)) { this->_ShowMessage("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n", WSAGetLastError()); this->_DeInitialize(); return false; } // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求 for( int i=0;i<MAX_POST_ACCEPT;i++ ) { // 新建一个IO_CONTEXT PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext(); if( false==this->_PostAccept( pAcceptIoContext ) ) { m_pListenContext->RemoveContext(pAcceptIoContext); return false; } } this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT ); return true; } //////////////////////////////////////////////////////////// // 最后释放掉所有资源 void CIOCPModel::_DeInitialize() { // 删除客户端列表的互斥量 DeleteCriticalSection(&m_csContextList); // 关闭系统退出事件句柄 RELEASE_HANDLE(m_hShutdownEvent); // 释放工作者线程句柄指针 for( int i=0;i<m_nThreads;i++ ) { RELEASE_HANDLE(m_phWorkerThreads[i]); } RELEASE(m_phWorkerThreads); // 关闭IOCP句柄 RELEASE_HANDLE(m_hIOCompletionPort); // 关闭监听Socket RELEASE(m_pListenContext); this->_ShowMessage("释放资源完毕.\n"); } //==================================================================================== // // 投递完成端口请求 // //==================================================================================== ////////////////////////////////////////////////////////////////// // 投递Accept请求 bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext ) { ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket ); // 准备参数 DWORD dwBytes = 0; pAcceptIoContext->m_OpType = ACCEPT_POSTED; WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf; OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped; // 为以后新连入的客户端先准备好Socket( 这个是与传统accept最大的区别 ) pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept ) { _ShowMessage("创建用于Accept的Socket失败!错误代码: %d", WSAGetLastError()); return false; } // 投递AcceptEx if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2), sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol)) { if(WSA_IO_PENDING != WSAGetLastError()) { _ShowMessage("投递 AcceptEx 请求失败,错误代码: %d", WSAGetLastError()); return false; } } return true; } //////////////////////////////////////////////////////////// // 在有客户端连入的时候,进行处理 // 流程有点复杂,你要是看不懂的话,就看配套的文档吧.... // 如果能理解这里的话,完成端口的机制你就消化了一大半了 // 总之你要知道,传入的是ListenSocket的Context,我们需要复制一份出来给新连入的Socket用 // 原来的Context还是要在上面继续投递下一个Accept请求 // bool CIOCPModel::_DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext ) { SOCKADDR_IN* ClientAddr = NULL; SOCKADDR_IN* LocalAddr = NULL; int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN); /////////////////////////////////////////////////////////////////////////// // 1. 首先取得连入客户端的地址信息 // 这个 m_lpfnGetAcceptExSockAddrs 不得了啊~~~~~~ // 不但可以取得客户端和本地端的地址信息,还能顺便取出客户端发来的第一组数据,老强大了... this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2), sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen); /* typedef VOID (PASCAL FAR * LPFN_GETACCEPTEXSOCKADDRS)( _In_reads_bytes_(dwReceiveDataLength+dwLocalAddressLength+dwRemoteAddressLength) PVOID lpOutputBuffer, _In_ DWORD dwReceiveDataLength, _In_ DWORD dwLocalAddressLength, _In_ DWORD dwRemoteAddressLength, _Outptr_result_bytebuffer_(*LocalSockaddrLength) struct sockaddr **LocalSockaddr, _Out_ LPINT LocalSockaddrLength, _Outptr_result_bytebuffer_(*RemoteSockaddrLength) struct sockaddr **RemoteSockaddr, _Out_ LPINT RemoteSockaddrLength ); */ this->_ShowMessage( _T("客户端 %s:%d 连入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) ); this->_ShowMessage( _T("客户额 %s:%d 信息:%s."),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf ); ////////////////////////////////////////////////////////////////////////////////////////////////////// // 2. 这里需要注意,这里传入的这个是ListenSocket上的Context,这个Context我们还需要用于监听下一个连接 // 所以我还得要将ListenSocket上的Context复制出来一份为新连入的Socket新建一个SocketContext PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT; pNewSocketContext->m_Socket = pIoContext->m_sockAccept; memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN)); // 参数设置完毕,将这个Socket和完成端口绑定(这也是一个关键步骤) if( false==this->_AssociateWithIOCP( pNewSocketContext ) ) { RELEASE( pNewSocketContext ); return false; } /////////////////////////////////////////////////////////////////////////////////////////////////// // 3. 继续,建立其下的IoContext,用于在这个Socket上投递第一个Recv数据请求 PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext(); pNewIoContext->m_OpType = RECV_POSTED; pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket; // 如果Buffer需要保留,就自己拷贝一份出来 //memcpy( pNewIoContext->m_szBuffer,pIoContext->m_szBuffer,MAX_BUFFER_LEN ); // 绑定完毕之后,就可以开始在这个Socket上投递完成请求了 if( false==this->_PostRecv( pNewIoContext) ) { pNewSocketContext->RemoveContext( pNewIoContext ); return false; } ///////////////////////////////////////////////////////////////////////////////////////////////// // 4. 如果投递成功,那么就把这个有效的客户端信息,加入到ContextList中去(需要统一管理,方便释放资源) this->_AddToContextList( pNewSocketContext ); //////////////////////////////////////////////////////////////////////////////////////////////// // 5. 使用完毕之后,把Listen Socket的那个IoContext重置,然后准备投递新的AcceptEx pIoContext->ResetBuffer(); return this->_PostAccept( pIoContext ); } //////////////////////////////////////////////////////////////////// // 投递接收数据请求 bool CIOCPModel::_PostRecv( PER_IO_CONTEXT* pIoContext ) { // 初始化变量 DWORD dwFlags = 0; DWORD dwBytes = 0; WSABUF *p_wbuf = &pIoContext->m_wsaBuf; OVERLAPPED *p_ol = &pIoContext->m_Overlapped; pIoContext->ResetBuffer(); pIoContext->m_OpType = RECV_POSTED; // 初始化完成后,,投递WSARecv请求 int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL ); // 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了 if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError())) { this->_ShowMessage("投递第一个WSARecv失败!"); return false; } return true; } ///////////////////////////////////////////////////////////////// // 在有接收的数据到达的时候,进行处理 bool CIOCPModel::_DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext ) { // 先把上一次的数据显示出现,然后就重置状态,发出下一个Recv请求 SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr; this->_ShowMessage( _T("收到 %s:%d 信息:%s"),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf ); // 然后开始投递下一个WSARecv请求 return _PostRecv( pIoContext ); } ///////////////////////////////////////////////////// // 将句柄(Socket)绑定到完成端口中 bool CIOCPModel::_AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext ) { // 将用于和客户端通信的SOCKET绑定到完成端口中 HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0); if (NULL == hTemp) { this->_ShowMessage(("执行CreateIoCompletionPort()出现错误.错误代码:%d"),GetLastError()); return false; } return true; } //==================================================================================== // // ContextList 相关操作 // //==================================================================================== ////////////////////////////////////////////////////////////// // 将客户端的相关信息存储到数组中 void CIOCPModel::_AddToContextList( PER_SOCKET_CONTEXT *pHandleData ) { EnterCriticalSection(&m_csContextList); m_arrayClientContext.Add(pHandleData); LeaveCriticalSection(&m_csContextList); } //////////////////////////////////////////////////////////////// // 移除某个特定的Context void CIOCPModel::_RemoveContext( PER_SOCKET_CONTEXT *pSocketContext ) { EnterCriticalSection(&m_csContextList); for( int i=0;i<m_arrayClientContext.GetCount();i++ ) { if( pSocketContext==m_arrayClientContext.GetAt(i) ) { RELEASE( pSocketContext ); m_arrayClientContext.RemoveAt(i); break; } } LeaveCriticalSection(&m_csContextList); } //////////////////////////////////////////////////////////////// // 清空客户端信息 void CIOCPModel::_ClearContextList() { EnterCriticalSection(&m_csContextList); for( int i=0;i<m_arrayClientContext.GetCount();i++ ) { delete m_arrayClientContext.GetAt(i); } m_arrayClientContext.RemoveAll(); LeaveCriticalSection(&m_csContextList); } //==================================================================================== // // 其他辅助函数定义 // //==================================================================================== //////////////////////////////////////////////////////////////////// // 获得本机的IP地址 CString CIOCPModel::GetLocalIP() { // 获得本机主机名 char hostname[MAX_PATH] = {0};//"DESKTOP-UO6NCCO" gethostname(hostname,MAX_PATH); struct hostent FAR* lpHostEnt = gethostbyname(hostname);//根据主机名获得本地IP if(lpHostEnt == NULL) { return DEFAULT_IP; } // 取得IP地址列表中的第一个为返回的IP(因为一台主机可能会绑定多个IP) LPSTR lpAddr = lpHostEnt->h_addr_list[0]; // 将IP地址转化成字符串形式 struct in_addr inAddr; memmove(&inAddr,lpAddr,4); m_strIP = CString( inet_ntoa(inAddr) ); return m_strIP; } /* C:\Users\yangbo>ipconfig Windows IP 配置 无线局域网适配器 本地连接* 3: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 无线局域网适配器 本地连接* 6: 连接特定的 DNS 后缀 . . . . . . . : 本地链接 IPv6 地址. . . . . . . . : fe80::7028:c4ff:7c5e:f3a6%22 IPv4 地址 . . . . . . . . . . . . : 192.168.191.1 子网掩码 . . . . . . . . . . . . : 255.255.255.0 默认网关. . . . . . . . . . . . . : 以太网适配器 以太网: 连接特定的 DNS 后缀 . . . . . . . : IPv6 地址 . . . . . . . . . . . . : 2001:250:4000:8160:3d05:f41e:e3f7:d57f 临时 IPv6 地址. . . . . . . . . . : 2001:250:4000:8160:44b2:3e04:bd90:def8 本地链接 IPv6 地址. . . . . . . . : fe80::3d05:f41e:e3f7:d57f%8 IPv4 地址 . . . . . . . . . . . . : 222.20.119.217 //(本机IP: 222.20.119.217湖北省武汉市 教育网) 子网掩码 . . . . . . . . . . . . : 255.255.255.0 默认网关. . . . . . . . . . . . . : fe80::1614:4bff:fe7d:4cbd%8 222.20.119.254 无线局域网适配器 WLAN: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : 以太网适配器 蓝牙网络连接: 媒体状态 . . . . . . . . . . . . : 媒体已断开连接 连接特定的 DNS 后缀 . . . . . . . : */ /////////////////////////////////////////////////////////////////// // 获得本机中处理器的数量 int CIOCPModel::_GetNoOfProcessors() { SYSTEM_INFO si; GetSystemInfo(&si);//8核 i7-7700 return si.dwNumberOfProcessors; } ///////////////////////////////////////////////////////////////////// // 在主界面中显示提示信息 void CIOCPModel::_ShowMessage(const CString szFormat,...) const { // 根据传入的参数格式化字符串 CString strMessage; va_list arglist; // 处理变长参数 va_start(arglist, szFormat); strMessage.FormatV(szFormat,arglist); va_end(arglist); // 在主界面中显示 CMainDlg* pMain = (CMainDlg*)m_pMain; if( m_pMain!=NULL ) { pMain->AddInformation(strMessage); TRACE( strMessage+_T("\n") ); } } ///////////////////////////////////////////////////////////////////// // 判断客户端Socket是否已经断开,否则在一个无效的Socket上投递WSARecv操作会出现异常 // 使用的方法是尝试向这个socket发送数据,判断这个socket调用的返回值 // 因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的 bool CIOCPModel::_IsSocketAlive(SOCKET s) { int nByteSent=send(s,"",0,0); if (-1 == nByteSent) return false; return true; } /////////////////////////////////////////////////////////////////// // 显示并处理完成端口上的错误 bool CIOCPModel::HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr ) { // 如果是超时了,就再继续等吧 if(WAIT_TIMEOUT == dwErr) { // 确认客户端是否还活着... if( !_IsSocketAlive( pContext->m_Socket) ) { this->_ShowMessage( _T("检测到客户端异常退出!") ); this->_RemoveContext( pContext ); return true; } else { this->_ShowMessage( _T("网络操作超时!重试中...") ); return true; } } // 可能是客户端异常退出了 else if( ERROR_NETNAME_DELETED==dwErr ) { this->_ShowMessage( _T("检测到客户端异常退出!") ); this->_RemoveContext( pContext ); return true; } else { this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr ); return false; } }
参考教程:[1]]杨传栋, 张焕远. Windows网络编程基础教程[M]. 清华大学出版社, 2015.P179
[2]教程资料:链接:https://pan.baidu.com/s/1BxrhYnP5pOpG0ZkECWcMrg 密码:ow1e
[3]完成端口模型
链接:https://pan.baidu.com/s/1rqu6OAX9NDhpUofwikufyw 密码:ok0q
WinSock 完成端口 Windows套接字编程