今天,将最后一个流模型例子给记录一下,代码同样来自于网上。由于一些原因,导致心情不是很好,还是按照既定计划,将该demo的笔记记录一下。源码地址:地址。
它是基于 windows的iocp完成的,所以是异步非阻塞io。 最近看了很多的关于io说明的,各种帖子看的我头大,始终还是没彻底搞懂,估计是没接触过太多,境界还没到。但是毫无疑问的是,windows的iocp是属于异步非阻塞io的。
代码结构:
服务端流程:
#include "main.h"
int main(){
CServerSocket* serverSocket = new CServerSocket();
serverSocket->prepareEnvironment();
delete serverSocket;
}
之前是想将里面的关键方法进行抽象来着,但是后面精力问题,就直接放到一个方法里面的了。
服务端关键代码:
......
//创建 IOCP的内核对象
//参数说明:已经打开的文件句柄或者空句柄(一般是客户端的句柄),已经存在的IOCP句柄,完成键,真正并发同时执行最大线程数
HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
......
//基于处理器的核心数量创建线程
for(DWORD i=0;i<(mySysInfo.dwNumberOfProcessors*2);++i){
//创建服务器工作线程,并将完成端口传递到该线程
HANDLE threadHandle = CreateThread(NULL,0,ServerWorkThread,completionPort,0,NULL);
CloseHandle(threadHandle);
}
......
int bindResult = bind(srvSocket,(SOCKADDR*)&srvAddr, sizeof(SOCKADDR));
......
//将socket设置为监听模式
int listenResult = listen(srvSocket,10);
......
//创建用于发送数据的线程
HANDLE sendThread = CreateThread(NULL,0,ServerSendThread,0,0,NULL);
......
主要作用是: 在程序启动之初,根据cpu的数量,创建相应的信息接收线程,信息接收线程要与一个完成端口进行绑定。 另外,启动一个信息发送线程,目前程序的实现是向所有连接的客户端发送信息。 但是也很容易做到向特定的 套接字客户端发送。
信息接收线程的核心代码如下:
while(true){
bRet = GetQueuedCompletionStatus(completionPort, &bytesTransferred, (PULONG_PTR)&perHandleData, (LPOVERLAPPED*)&ipOverLapped, INFINITE);
LpperIOData perIoData=NULL;
perIoData = (LpperIOData)CONTAINING_RECORD(ipOverLapped,PerIOData,overlapped);
......
第二行中,是阻塞等待通知的。 它的通知机制是: 通过io完成端口,利用overlapped的结构。 其中,getQueuedCompletionStatus中,第一个参数为完成端口,倒数第二个参数 为完成端口携带消息的介质。倒数带三个参数 相当于传递的消息类型,这里为包含socket的结构体。
此外要注意,重叠机制获取完数据后,需要设置重叠状态:
//为下一个重叠调用 建立单 I/O 操作数据
perIoData = (LpperIOData)GlobalAlloc(GPTR, sizeof(LpperIOData));
ZeroMemory(&(perIoData->overlapped), sizeof(OVERLAPPED)); //清空内存
perIoData->databuff.len= 1024;
perIoData->databuff.buf=perIoData->buffer;
perIoData->operationType=0;//read;
WSARecv(perHandleData->socket, &(perIoData->databuff), 1, (&recvBytes), &flags, &(perIoData->overlapped), NULL);
代码中的顺序是不能乱序的,在我第一次写这个代码的时候,当时对c++指针,内存分配啥的还不是很清楚,结果导致这个项目中原来的代码运行不成功。 之后各种尝试,最后试成功了。 同时,当时遇到一个奇怪的现象,那就是运行是正常的,但是调试偶尔会出错。 现在想来,可能跟完成端口有关系。
我本地的cpu核心数是4核,所有在当前例子中,开了八个信息接收线程,这八个线程会同时阻塞在完成端口。根据我完成端口状态,一次只能允许一个并发,所以当完成端口有信号时,只会触发一个线程响应。 这个是可以控制的。
接着看看信息发送线程,比较简单:
//发送信息的线程执行函数
DWORD WINAPI ServerSendThread(LPVOID ipParam){
while (1){
char talk[200];
gets(talk);
if (talk ==""){
return 0;
}
WaitForSingleObject(hMutex,INFINITE);
for(int i=0;i<clientGroup.size();++i){
send(clientGroup[i]->socket,talk,200,0);
}
ReleaseMutex(hMutex);
}
return 0;
}
以上的信息接收时的打印,和 信息输入时的控制台录入,我最开始使用的时 std::cout,于std::cin的标准c++库函数。 但是经测试,并不能正确获取值。 当时网上查找说的是 ,它们不是线程安全的原因。 但是我现在想了下,可能恰恰因为它们是线程安全的才导致这样的问题。 控制台对一个程序来说,资源是唯一的,由于该资源被其它资源占用,所有导致当前使用线程不能正确的得到想要的结果。
客户端的代码也比较简单:
int main(){
CClientSocket* clientSocket= new CClientSocket();
clientSocket->prepareEnvironment();
delete clientSocket;
}
同样的理由,并未对方法进行拆分。
HANDLE sendThread = CreateThread(NULL,0,SendMessageThread,NULL,0,NULL);
HANDLE receiveThread = CreateThread(NULL,0,ReceiveMessageThread,NULL,0,NULL);
此时客户端采用了 读写均单独开始一个线程,这样的好处在于,可以拜托类似于 http应用层协议那种 一收一发的固定通信模式。 当前,它也可支持原来的通信模式,这主要看使用者意愿。
发送线程代码:
DWORD WINAPI SendMessageThread(LPVOID ipParam){
while (1){
string talk;
getline(cin,talk);
WaitForSingleObject(bufferMutex,INFINITE);
if("quit"==talk){
talk.push_back('\0');
send(clientSocket,talk.c_str(),200,0);
break;
}
std::cout<<"\nI Say:(\"quit\" to exit):"<<talk<<"\n";
send(clientSocket,talk.c_str(),200,0);
ReleaseSemaphore(bufferMutex,1,NULL);
Sleep(5000);
}
return 0;
}
接收线程代码:
DWORD WINAPI ReceiveMessageThread(LPVOID ipParam){
while (1){
char recvBuf[300];
recv(clientSocket,recvBuf,200,0);
WaitForSingleObject(bufferMutex,INFINITE);
std::cout<<"Server Says:"<<recvBuf<<std::endl;
ReleaseSemaphore(bufferMutex,1,NULL);
if(recvBuf[0]=='\0'){
cout<<"接收线程关闭!";
break;
}
}
return 0;
}
注意到这两个线程,实际上是对线程退出时机做了预设的。 这个层面来说,它实际上便相当于一个特殊意义上的应用层协议了。
运行效果: