最近都在做windows socket相关的东西,使用IOCP其中还是遇到了一些问题,当然遇到问题就要尝试解决问题,这也是一个学习的过程。
IOCP可以说是windows 上性能最好的网络模型了,具体IOCP,就不介绍了,Google,baidu一下,你就知道了。
玉哥在用我的网络接口,发大量数据时,发现,数据对不上,即收包量会增多,内容是不会丢的,由于我是严格控制服务器端的收包大小,和收包量,所以也感觉很奇怪。
先是怀疑WSASend连续调用,会有问题,但大致看了看这篇文章(http://bbs.pediy.com/showthread.php?p=826108),这文章写的很给力,所以又觉得不会有问题,于是我就自己开始写了一个测试程序。
当然也不是写了就马上能发现问题,调了一下午加一个晚上,发现了问题所在。
问题在WSARecv,这个东西它居然没填满我要求的缓存,就给完成端口发了通知,导致客户端发过来的数据,被截开了,所以包的数量就增加了,但是由于是TCP,所以内容,顺序都没问题。
接下来说说如何解决问题:
BOOL bIORet = GetQueuedCompletionStatus(CompPort,
&dwIoSize,
(LPDWORD)&pCompletionKey,
&lpOverlapped,
INFINITE);
关键看第二个参数,dwIoSize,它告诉你,你实际接收到了多少数据,通过它,你就应该知道还要继续接受多少数据了吧,明白人,应该就知道该如何做了,不明白的看我的代码吧。
注意:WSARecv,最好不要连续调用,特别是在多线程里连续调用,因为后果可能会无法预料,这个应该还是根据需求而定,也许有的情况下可以。
剩下的就是贴代码了,时间紧任务重,加上本人比较水,所以代码质量不高,多多包涵,同时还请各位朋友给出建议,提出批评,大家一起学习,共同进步。
client:
#include<iostream>
#include<fstream>
#include<WinSock2.h>
#include<Windows.h>
#include<tchar.h>
using namespace std;
#define BUF_TIMES 10
struct CompletionKey{SOCKET s;};
typedef struct io_operation_data
{
WSAOVERLAPPED overlapped; //重叠结构
WSABUF wsaBuf; //发送接收缓冲区
}IO_OPERATION_DATA, *PIO_OPERATION_DATA;
SOCKET sClient;
//char (*data)[1024];
DWORD WINAPI ServiceThreadProc(PVOID pParam);
int main()
{
HRESULT hr=CoInitializeEx(NULL,COINIT_MULTITHREADED);
if(FAILED(hr))
{
MessageBox(NULL,_T("Initialize COM failed!"),_T("error"),MB_OK);
//cout<<"Initialize COM failed!"<<endl;
//return false;
}
WORD wVersionRequest;
WSADATA wsaData;
wVersionRequest=MAKEWORD(2,2);
int nErrCode=WSAStartup(wVersionRequest,&wsaData);
if(nErrCode!=0)
{
//cout<<" start up error!"<<endl;
MessageBox(NULL,_T("start up error!"),_T("error"),MB_OK);
}
if(LOBYTE(wsaData.wVersion)!=2||HIBYTE(wsaData.wVersion)!=2)
{
//cout<<" lib is not 2.2!"<<endl;
MessageBox(NULL,_T("lib is not 2.2!"),_T("error"),MB_OK);
WSACleanup();
}
sClient=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);//socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(INVALID_SOCKET==sClient)
{
MessageBox(NULL,_T("socket error!"),_T("error"),MB_OK);
}
//获取系统默认的发送数据缓冲区大小
unsigned int uiRcvBuf;
int uiRcvBufLen = sizeof(uiRcvBuf);
nErrCode= getsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, &uiRcvBufLen);
if (SOCKET_ERROR == nErrCode)
{
//cout<<"获取系统默认的发送数据缓冲区大小failed!"<<endl;
MessageBox(NULL,_T("获取系统默认的发送数据缓冲区大小failed!"),_T("error"),MB_OK);
//return false;
}
//设置系统发送数据缓冲区为默认值的BUF_TIMES倍
uiRcvBuf *= BUF_TIMES;
nErrCode = setsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, uiRcvBufLen);
if (SOCKET_ERROR == nErrCode)
{
//cout<<"修改系统发送数据缓冲区失败!"<<endl;
MessageBox(NULL,_T("修改系统发送数据缓冲区失败!"),_T("error"),MB_OK);
}
//检查设置系统发送数据缓冲区是否成功
unsigned int uiNewRcvBuf;
nErrCode=getsockopt(sClient, SOL_SOCKET, SO_SNDBUF,(char*)&uiNewRcvBuf, &uiRcvBufLen);
if (SOCKET_ERROR == nErrCode || uiNewRcvBuf != uiRcvBuf)
{
// cout<<"修改系统发送数据缓冲区失败!"<<endl;
MessageBox(NULL,_T("修改系统发送数据缓冲区失败!"),_T("error"),MB_OK);
}
HANDLE CompPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (CompPort == NULL)
{
MessageBox(NULL,_T("创建完成端口失败!"),_T("error"),MB_OK);
//WSACleanup();
//return ;
}
CompletionKey iCompletionKey;
iCompletionKey.s=sClient;
if (CreateIoCompletionPort((HANDLE)sClient,CompPort,(DWORD)&iCompletionKey,0) == NULL)
{
//出错处理。。
MessageBox(NULL,_T("关联完成端口失败!"),_T("error"),MB_OK);
}
//addrHost
SOCKADDR_IN addrHost;
addrHost.sin_family=AF_INET;
addrHost.sin_port=htons(8800);
addrHost.sin_addr.S_un.S_addr=inet_addr("192.168.0.137");
int retVal=connect(sClient,(sockaddr*)&addrHost,sizeof(sockaddr));
if(retVal==SOCKET_ERROR)
{
MessageBox(NULL,_T("connect error!"),_T("error"),MB_OK);
//return false;
}
cout<<"connect success"<<endl;
HANDLE hThread=CreateThread(NULL,0,ServiceThreadProc,NULL,0,NULL);
DWORD dwIoSize=-1; //传输字节数
LPOVERLAPPED lpOverlapped=NULL; //重叠结构指针
CompletionKey* pCompletionKey=NULL;
PIO_OPERATION_DATA pIO=NULL;
int count=0;
ofstream fos;
fos.open("c.txt");
while(1)
{
BOOL bIORet = GetQueuedCompletionStatus(CompPort,
&dwIoSize,
(LPDWORD)&pCompletionKey,
&lpOverlapped,
INFINITE);
//失败的操作完成
if (FALSE == bIORet && NULL != pCompletionKey)
{
//客户端断开
}
//成功的操作完成
if(bIORet && lpOverlapped && pCompletionKey)
{
cout<<"count: "<<count+1<<" "
<<"number: "<<dwIoSize<<endl;
pIO = CONTAINING_RECORD(lpOverlapped,IO_OPERATION_DATA,overlapped);
fos.write(pIO->wsaBuf.buf,10240*5);
fos<<endl;
if(count==99)
break;
count++;
}
}
WaitForSingleObject(hThread,INFINITE);
cout<<"main end: "<<count+1<<endl;
fos.close();
Sleep(5000);
CoUninitialize();
WSACleanup();
return 0;
}
DWORD WINAPI ServiceThreadProc(PVOID pParam)
{
char (*a)[10240*5]=new char[100][10240*5];
for(int i=0;i<100;i++)
{
if(i%3==0)
memset(a[i],'a',10240*5);
else if(i%3==1)
memset(a[i],'b',10240*5);
else if(i%3==2)
memset(a[i],'c',10240*5);
}
DWORD flags = 0; //标志
DWORD sendBytes =0; //发送字节数
int count=0;
while(count<100)
{
PIO_OPERATION_DATA pIO = new IO_OPERATION_DATA;
ZeroMemory(pIO,sizeof(IO_OPERATION_DATA));
pIO->wsaBuf.buf=a[count];
pIO->wsaBuf.len=10240*5;
//Sleep(30);
if(WSASend(sClient,&(pIO->wsaBuf),1,&sendBytes,flags,&(pIO->overlapped),NULL)== SOCKET_ERROR)
{
if(ERROR_IO_PENDING != WSAGetLastError())//发起重叠操作失败
{
cout<<"send failed"<<endl;
}
}
count++;
}
cout<<"thread func: "<<count<<endl;
Sleep(10000);
return 0;
}
server:
#include<iostream>
#include<vector>
#include<fstream>
#include<WinSock2.h>
#include<Windows.h>
#include<tchar.h>
using namespace std;
#define BUF_TIMES 10
struct CompletionKey{SOCKET s;};
typedef struct io_operation_data
{
WSAOVERLAPPED overlapped; //重叠结构
WSABUF wsaBuf; //发送接收缓冲区
}IO_OPERATION_DATA, *PIO_OPERATION_DATA;
int main()
{
HRESULT hr=CoInitializeEx(NULL,COINIT_MULTITHREADED);
if(FAILED(hr))
{
MessageBox(NULL,_T("Initialize COM failed!"),_T("error"),MB_OK);
//cout<<"Initialize COM failed!"<<endl;
//return false;
}
WORD wVersionRequest;
WSADATA wsaData;
wVersionRequest=MAKEWORD(2,2);
int nErrCode=WSAStartup(wVersionRequest,&wsaData);
if(nErrCode!=0)
{
//cout<<" start up error!"<<endl;
MessageBox(NULL,_T("start up error!"),_T("error"),MB_OK);
}
if(LOBYTE(wsaData.wVersion)!=2||HIBYTE(wsaData.wVersion)!=2)
{
//cout<<" lib is not 2.2!"<<endl;
MessageBox(NULL,_T("lib is not 2.2!"),_T("error"),MB_OK);
WSACleanup();
}
SOCKET sListen=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);//socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(INVALID_SOCKET==sListen)
{
MessageBox(NULL,_T("socket error!"),_T("error"),MB_OK);
}
//获取系统默认的发送数据缓冲区大小
unsigned int uiRcvBuf;
int uiRcvBufLen = sizeof(uiRcvBuf);
nErrCode= getsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, &uiRcvBufLen);
if (SOCKET_ERROR == nErrCode)
{
//cout<<"获取系统默认的发送数据缓冲区大小failed!"<<endl;
MessageBox(NULL,_T("获取系统默认的发送数据缓冲区大小failed!"),_T("error"),MB_OK);
//return false;
}
//设置系统发送数据缓冲区为默认值的BUF_TIMES倍
uiRcvBuf *= BUF_TIMES;
nErrCode = setsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiRcvBuf, uiRcvBufLen);
if (SOCKET_ERROR == nErrCode)
{
//cout<<"修改系统发送数据缓冲区失败!"<<endl;
MessageBox(NULL,_T("修改系统发送数据缓冲区失败!"),_T("error"),MB_OK);
}
//检查设置系统发送数据缓冲区是否成功
unsigned int uiNewRcvBuf;
nErrCode=getsockopt(sListen, SOL_SOCKET, SO_SNDBUF,(char*)&uiNewRcvBuf, &uiRcvBufLen);
if (SOCKET_ERROR == nErrCode || uiNewRcvBuf != uiRcvBuf)
{
// cout<<"修改系统发送数据缓冲区失败!"<<endl;
MessageBox(NULL,_T("修改系统发送数据缓冲区失败!"),_T("error"),MB_OK);
}
//cout<<uiNewRcvBuf<<endl;
HANDLE CompPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (CompPort == NULL)
{
MessageBox(NULL,_T("创建完成端口失败!"),_T("error"),MB_OK);
//WSACleanup();
//return ;
}
SOCKADDR_IN addrHost;
addrHost.sin_family=AF_INET;
addrHost.sin_port=htons(8800);
addrHost.sin_addr.S_un.S_addr=INADDR_ANY;
int retVal=bind(sListen,(LPSOCKADDR)&addrHost, sizeof(SOCKADDR_IN));
if(SOCKET_ERROR==retVal)
{
//cout<<"bind error!"<<endl;
MessageBox(NULL,_T("bind error!"),_T("error"),MB_OK);
//closesocket(this->m_sListen);
//return false;
}
retVal=listen(sListen,5);
if(SOCKET_ERROR==retVal)
{
MessageBox(NULL,_T("listen error!"),_T("error"),MB_OK);
}
sockaddr_in addrClient;
int addrLen=sizeof(sockaddr_in);
SOCKET sClient=accept(sListen,(sockaddr*)&addrClient,&addrLen);
cout<<"come in"<<endl;
CompletionKey iCompletionKey;
iCompletionKey.s=sClient;
//关联完成端口
if (CreateIoCompletionPort((HANDLE)sClient,CompPort,(DWORD)(&iCompletionKey),0) == NULL)
{
//出错处理。。
MessageBox(NULL,_T("关联完成端口失败!"),_T("error"),MB_OK);
}
DWORD dwIoSize=-1; //传输字节数
LPOVERLAPPED lpOverlapped=NULL; //重叠结构指针
CompletionKey* pCompletionKey=NULL;
DWORD flags = 0; //标志
DWORD recvBytes =0; //接收字节数
PIO_OPERATION_DATA pIO=NULL;
int count=0;
pIO=new IO_OPERATION_DATA;
ZeroMemory(pIO, sizeof(IO_OPERATION_DATA));
pIO->wsaBuf.buf=new char[10240*5];
pIO->wsaBuf.len=10240*5;
if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR)
{
if(ERROR_IO_PENDING != WSAGetLastError())
{
cout<<"recv failed"<<endl;
}
}
vector<IO_OPERATION_DATA*> vecCache;
ofstream fos;
fos.open("s.txt");
while(1)
{
BOOL bIORet = GetQueuedCompletionStatus(CompPort,
&dwIoSize,
(LPDWORD)&pCompletionKey,
&lpOverlapped,
INFINITE);
//失败的操作完成
if (FALSE == bIORet && NULL != pCompletionKey)
{
//客户端断开
}
//成功的操作完成
if(bIORet && lpOverlapped && pCompletionKey)
{
pIO = CONTAINING_RECORD(lpOverlapped,IO_OPERATION_DATA,overlapped);
if(dwIoSize<pIO->wsaBuf.len)
{
vecCache.push_back(pIO);
int size=pIO->wsaBuf.len;
pIO->wsaBuf.len=dwIoSize;
pIO=new IO_OPERATION_DATA;
ZeroMemory(pIO, sizeof(IO_OPERATION_DATA));
pIO->wsaBuf.buf=new char[size-dwIoSize];
pIO->wsaBuf.len=size-dwIoSize;
if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR)
{
if(ERROR_IO_PENDING != WSAGetLastError())
{
cout<<"recv failed"<<endl;
}
}
}
else
{
if(vecCache.size()!=0)
{
char*p=new char[10240*5];
int size=0;
for(vector<IO_OPERATION_DATA*>::iterator it=vecCache.begin();
it!=vecCache.end();it++)
{
memcpy(p+size,(*it)->wsaBuf.buf,(*it)->wsaBuf.len);
size+=(*it)->wsaBuf.len;
//清理资源
delete []((*it)->wsaBuf.buf);
delete (*it);
}
memcpy(p+size,pIO->wsaBuf.buf,pIO->wsaBuf.len);
count++;
fos.write(p,10240*5);
fos<<endl;
vecCache.clear();
cout<<"count: "<<count<<endl;
//清理资源
delete [](pIO->wsaBuf.buf);
delete pIO;
delete []p;
}
else
{
count++;
fos.write(pIO->wsaBuf.buf,10240*5);
fos<<endl;
cout<<"count: "<<count
<<" number: "<<dwIoSize<<endl;
//清理资源
delete [](pIO->wsaBuf.buf);
delete pIO;
}
pIO=new IO_OPERATION_DATA;
ZeroMemory(pIO, sizeof(IO_OPERATION_DATA));
pIO->wsaBuf.buf=new char[10240*5];
pIO->wsaBuf.len=10240*5;
if (WSARecv(sClient,&(pIO->wsaBuf),1,&recvBytes,&flags,&(pIO->overlapped),NULL) == SOCKET_ERROR)
{
if(ERROR_IO_PENDING != WSAGetLastError())
{
cout<<"recv failed"<<endl;
}
}
}
if(count==100)
break;
}
}
cout<<"main end: "<<count<<endl;
fos.close();
Sleep(10000);
CoUninitialize();
WSACleanup();
return 0;
}