IOCP全名:IO conplete port
可以用function来生成一些回调用函数让代码耦合度更小
WSAStartup:加载dll动态库
用signal(SIGINT, signalHandler);处理中断
CreateIoCompletionPort:创建操作系统控制的完成端口,绑定资源或设备或文件或socket
用WSAIoctl获得一个函数指针,得到连接进来的客户端
GetQueuedCompletionStatus:得到队列完成状态,当OK时表示有新的客户端连接进入有事件,否则通过最后一个参数设置停留时间
ServerSocket:接收和发送数据
通过startAccept()建立完成端口,一个接收消息,一个发送消息
handleRecvData和handleRemovePlayer不能算回调
lpfnAcceptEx异步还未生效,返回false是成功
发送数据会在多个线程中进行如果需要用lock_guard,而接收数据只在一个线程中所以不需要
程序实现IOCP模型的基本步骤,属于异步编程:
绑定回调事件
以下是简易服务器完整代码:
//Server.h
#ifndef YT_WIN_IOCP_SERVER_H
#define YT_WIN_IOCP_SERVER_H
#ifndef UNICODE
#define UNICODE
#endif
#define WIN32_LEAN_AND_MEAN
#include <winsock2.h>
#include <ws2tcpip.h>
#include <mswsock.h>
#include <cstdio>
#include <iostream>
#include <functional>
#include "ServerSocket.h"
class Server {
public:
Server(u_short port);
~Server();
bool startAccept();
void waitingForAccept();
void waitingForIo();
bool isRunning() const { return m_running;}
void stop() { m_running = false;}
typedef std::function<void(ServerSocket::pointer)> HandleNewConnect;
HandleNewConnect newConn;
ServerSocket::HandleRecvFunction socketRecv;
ServerSocket::HandleClose socketClose;
ServerSocket::HandleError socketError;
private:
u_short m_port;
SOCKET m_listenSocket;
HANDLE m_completePort;
LPFN_ACCEPTEX lpfnAcceptEx;
SOCKET m_currentAccetSocket;
WSAOVERLAPPED m_acceptUnit;
HANDLE m_ioCompletePort;
bool m_running;
std::vector<char> m_acceptBuffer;
bool tryNewConn();
};
#endif
//Server.cpp
#include "Server.h"
#include <string>
#include <cassert>
Server::Server(u_short p)
: m_port(p),
m_listenSocket(INVALID_SOCKET),
m_completePort(NULL),
lpfnAcceptEx(nullptr),
m_currentAccetSocket(INVALID_SOCKET),
m_ioCompletePort(NULL),
m_running(false),m_acceptBuffer(1024) {}
Server::~Server() {
if(m_listenSocket != INVALID_SOCKET) closesocket(m_listenSocket);
CloseHandle(m_ioCompletePort);
CloseHandle(m_completePort);
}
bool Server::startAccept() {
m_completePort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if(!m_completePort) {
return false;
}
m_ioCompletePort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if(!m_ioCompletePort) {
return false;
}
m_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(m_listenSocket == INVALID_SOCKET) {
return false;
}
CreateIoCompletionPort((HANDLE)m_listenSocket, m_completePort, 0, 0);
hostent* localHost = gethostbyname("");
char ip[64];
inet_ntop(AF_INET, (struct in_addr*)*localHost->h_addr_list, ip, sizeof(ip));
sockaddr_in service;
service.sin_family = AF_INET;
service.sin_addr.s_addr = inet_addr(ip);
service.sin_port = htons(m_port);
BOOL reuse = TRUE;
setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse,
sizeof(reuse));
if(bind(m_listenSocket, (SOCKADDR*)&service, sizeof(service)) == SOCKET_ERROR) {
std::cout << "bind failed with error: " << WSAGetLastError() << "\n";
return false;
}
auto result = listen(m_listenSocket, 100);
if(result == SOCKET_ERROR) {
std::cout << "listen failed with error: " << WSAGetLastError() << "\n";
return false;
}
DWORD dwBytes = 0;
GUID guidAcceptEx = WSAID_ACCEPTEX;
result = WSAIoctl(m_listenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx, sizeof(guidAcceptEx), &lpfnAcceptEx,
sizeof(lpfnAcceptEx), &dwBytes, nullptr, nullptr);
if(result == SOCKET_ERROR) {
std::cout << "WSAIoctl failed with error: " << WSAGetLastError() << "\n";
return false;
}
m_currentAccetSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(m_currentAccetSocket == INVALID_SOCKET) return false;
//char lpOutputBuf[1024];
int outBufLen = 1024;
memset(&m_acceptUnit, 0, sizeof(m_acceptUnit));
//AcceptEx
auto ret = lpfnAcceptEx(
m_listenSocket, m_currentAccetSocket, m_acceptBuffer.data(), 0,
// outBufLen - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, nullptr, &m_acceptUnit);
if(ret == FALSE && WSAGetLastError() == ERROR_IO_PENDING) {
std::cout << "start listen " << ip << ":" << m_port << std::endl;
m_running = true;
}
return m_running;
}
bool Server::tryNewConn() {
//char lpOutputBuf[1024];
int outBufLen = 1024;
memset(&m_acceptUnit, 0, sizeof(WSAOVERLAPPED));
auto ret = lpfnAcceptEx(
m_listenSocket, m_currentAccetSocket, m_acceptBuffer.data(), 0,
// outBufLen - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, nullptr, &m_acceptUnit);
if(ret == FALSE && WSAGetLastError() == ERROR_IO_PENDING) {
return true;
}
return false;
}
void Server::waitingForAccept() {
int id = 1;
while(isRunning()) {
DWORD bytes = 0;
ULONG_PTR dwCompletionKey;
LPOVERLAPPED lpOverlapped = nullptr;
auto ok = GetQueuedCompletionStatus(m_completePort, &bytes,
&dwCompletionKey, &lpOverlapped, 1000);
if(ok) {
std::string info(m_acceptBuffer.begin(), m_acceptBuffer.begin() + bytes);
CreateIoCompletionPort((HANDLE)m_currentAccetSocket, m_ioCompletePort, 0,
0);
auto ns = std::make_shared<ServerSocket>(id, m_currentAccetSocket);
ns->handleClose = socketClose;
ns->handleError = socketError;
ns->handleRecv = socketRecv;
if(newConn){
newConn(ns);
}
++id;
m_currentAccetSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
ok = tryNewConn();
if(!ok) break;
} else {
auto errorCode = GetLastError();
if(errorCode == WAIT_TIMEOUT) continue;
std::cout << "has error " << errorCode << " in accept new client"
<< WSAGetLastError() << "\n";
break;
}
}
}
void Server::waitingForIo() {
while(isRunning()) {
DWORD bytes = 0;
ULONG_PTR dwCompletionKey;
LPOVERLAPPED lpOverlapped = nullptr;
auto ok = GetQueuedCompletionStatus(m_ioCompletePort, &bytes,
&dwCompletionKey, &lpOverlapped, 1000);
if(ok) {
auto unit = (OverUnit*)lpOverlapped;
assert(unit);
auto socket = unit->socket;
if(bytes == 0) {
socket->onClosed();
} else {
if(unit->type == 0) {
socket->onFinishedRecv(bytes);
} else if(unit->type == 1) {
socket->onFinishedSend(bytes);
socket->trySendMore();
}
}
} else {
auto errorCode = GetLastError();
if(errorCode == WAIT_TIMEOUT) continue;
std::cout << "has error " << errorCode << " in accept new client"
<< WSAGetLastError() << "\n";
if(!lpOverlapped) {
break;
} else {
OverUnit* unit = (OverUnit*)lpOverlapped;
auto socket = unit->socket;
if(socket) {
socket->onError(unit->type, WSAGetLastError());
}
}
}
}
}
//ServerSocket.h
#ifndef YT_SERVER_SOCKET_H
#define YT_SERVER_SOCKET_H
#ifndef UNICODE
#define UNICODE
#endif
#define WIN32_LEAN_AND_MEAN
#include <winsock2.h>
#include <ws2tcpip.h>
#include <mswsock.h>
#include <memory>
#include <deque>
#include <vector>
#include <functional>
#include <mutex>
class ServerSocket;
struct OverUnit : WSAOVERLAPPED {
ServerSocket* socket;
int type;
};
class ServerSocket : public std::enable_shared_from_this<ServerSocket> {
public:
using pointer = std::shared_ptr<ServerSocket>;
typedef std::function<void(const pointer&, const char* data, int size)>
HandleRecvFunction;
typedef std::function<void(const pointer&)> HandleClose;
HandleRecvFunction handleRecv;
HandleClose handleClose;
typedef std::function<void(const pointer&, int, int)> HandleError;
HandleError handleError;
ServerSocket(int id, SOCKET s);
~ServerSocket();
void onError(int errorType, int errorCode);
void onClosed();
void onFinishedSend(int transfered);
void onFinishedRecv(int transfered);
int id() const { return m_id; }
void startRecv();
void sendMessage(const std::string& info);
void trySendMore();
private:
int m_id;
SOCKET m_socket;
using Buffer = std::vector<char>;
Buffer m_currentRecv;
std::deque<Buffer> m_recvBuffers;
std::mutex m_mutex;
std::deque<std::string> m_sendBuffers;
OverUnit m_recvUnit;
OverUnit m_sendUnit;
void sendFrontBuffer();
WSABUF m_sendWSA;
WSABUF m_recvWSA;
};
#endif
//ServerSocket.cpp
#include "ServerSocket.h"
#include <cassert>
#include <iostream>
#include <stdexcept>
ServerSocket::ServerSocket(int id, SOCKET s)
: m_id(id), m_socket(s), m_currentRecv(8192) {
memset(&m_recvUnit, 0, sizeof(m_recvUnit));
m_recvUnit.socket = this;
m_recvUnit.type = 0; // recv
memset(&m_sendUnit, 0, sizeof(m_sendUnit));
m_sendUnit.socket = this;
m_sendUnit.type = 1;
}
ServerSocket::~ServerSocket() {
closesocket(m_socket);
}
void ServerSocket::onError(int errorType, int errorCode) {
if(handleError) {
auto self = shared_from_this();
handleError(self, errorType, errorCode);
}
}
void ServerSocket::onFinishedSend(int transfered) {
}
void ServerSocket::onFinishedRecv(int transfered) {
auto self = shared_from_this();
handleRecv(self, m_currentRecv.data(), transfered);
startRecv();
}
void ServerSocket::onClosed() {
if(handleClose) {
auto self = shared_from_this();
handleClose(self);
}
}
void ServerSocket::sendMessage(const std::string& info) {
std::lock_guard<std::mutex> lock(m_mutex);
bool sending = !m_sendBuffers.empty();
m_sendBuffers.push_back(info);
if(!sending) {
sendFrontBuffer();
}
}
void ServerSocket::sendFrontBuffer() {
// WSABUF buf;
m_sendWSA.len = m_sendBuffers.front().size();
m_sendWSA.buf = (char*)m_sendBuffers.front().data();
auto result = WSASend(m_socket, &m_sendWSA, 1, nullptr, 0, &m_sendUnit, nullptr);
if(result != 0) {
auto error = WSAGetLastError();
if(error != WSA_IO_PENDING) {
throw std::runtime_error("bad for send message");
}
}
}
void ServerSocket::trySendMore() {
std::lock_guard<std::mutex> lock(m_mutex);
assert(!m_sendBuffers.empty());
m_sendBuffers.pop_front();
if(!m_sendBuffers.empty()) {
sendFrontBuffer();
}
}
void ServerSocket::startRecv() {
//WSABUF buf;
m_recvWSA.len = m_currentRecv.size();
m_recvWSA.buf = (char*)m_currentRecv.data();
DWORD flag = 0;
auto ret = WSARecv(m_socket, &m_recvWSA, 1, nullptr, &flag,
static_cast<WSAOVERLAPPED*>(&m_recvUnit), nullptr);
if(ret != 0) {
auto code = WSAGetLastError();
if(code != WSA_IO_PENDING) {
std::cout << "error for " << code << "\n";
throw std::runtime_error("bad for start recv");
}
}
}
//main.cpp
#include <cassert>
#include <signal.h>
#include <thread>
#include <map>
#include <string>
#include "Server.h"
#pragma comment(lib, "Ws2_32.lib")
// global bad habbit
std::map<int, ServerSocket::pointer> players;
void handleRecvData(const ServerSocket::pointer& p,
const char* data,
int size) {
std::string info(data, data + size);
std::cout << "recv: " << info << std::endl;
//for (int i = 0; i < 10; ++i) {
//Sleep(1);
p->sendMessage("+<html>hello world</html>!");
//}
}
void handleConn(Server* s) {
try {
s->waitingForAccept();
} catch(...) {
std::cout << "has error \n";
}
}
std::function<void()> handler;
void signalHandler(int code) {
std::cout << "handle " << code << std::endl;
if(handler) {
handler();
handler = nullptr;
}
}
void updateClientIo(HANDLE io, bool& isOver) {
while(!isOver) {
DWORD bytes = 0;
ULONG_PTR dwCompletionKey;
LPOVERLAPPED lpOverlapped = nullptr;
auto ok = GetQueuedCompletionStatus(io, &bytes, &dwCompletionKey,
&lpOverlapped, 1000);
if(ok) {
auto unit = (OverUnit*)lpOverlapped;
assert(unit);
auto socket = unit->socket;
if(bytes == 0) {
socket->onClosed();
} else {
if(unit->type == 0) {
socket->onFinishedRecv(bytes);
} else if(unit->type == 1) {
socket->onFinishedSend(bytes);
socket->trySendMore();
}
}
} else {
auto errorCode = GetLastError();
if(errorCode == WAIT_TIMEOUT) continue;
std::cout << "has error " << errorCode << " in accept new client"
<< WSAGetLastError() << "\n";
if(!lpOverlapped) {
break;
} else {
OverUnit* unit = (OverUnit*)lpOverlapped;
auto socket = unit->socket;
if(socket) {
socket->onError(unit->type, WSAGetLastError());
}
}
}
}
}
void handleAddNewPlayer(const ServerSocket::pointer& p) {
std::cout << "hello " << p->id() << std::endl;
p->startRecv();
players.emplace(p->id(), p);
}
void handleRemovePlayer(const ServerSocket::pointer& p) {
std::cout << "bye " << p->id() << std::endl;
players.erase(p->id());
}
ServerSocket::pointer buildClientSocket(const char* addr, u_short port, int id,
HANDLE io) {
auto connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(connectSocket == INVALID_SOCKET) {
return nullptr;
}
sockaddr_in clientService;
clientService.sin_family = AF_INET;
clientService.sin_addr.s_addr = inet_addr(addr);
clientService.sin_port = htons(port);
auto iResult =
connect(connectSocket, (SOCKADDR*)&clientService, sizeof(clientService));
if(iResult == SOCKET_ERROR) {
std::cout << "connect function failed with error:" << WSAGetLastError()
<< std::endl;
iResult = closesocket(connectSocket);
if(iResult == SOCKET_ERROR)
std::cout << "closesocket function failed with error:"
<< WSAGetLastError() << std::endl;
return nullptr;
}
CreateIoCompletionPort((HANDLE)connectSocket, io, 0, 0);
return std::make_shared<ServerSocket>(id, connectSocket);
}
int main(int argc, char** argv) {
signal(SIGINT, signalHandler);
WSADATA data;
auto result = WSAStartup(MAKEWORD(2,2), &data);
if(result != NO_ERROR) {
std::cout << "error at WSAStartup\n";
return 1;
}
auto server = std::make_unique<Server>(8901);
if(!server) {
WSACleanup();
return 1;
}
server->socketRecv = handleRecvData;
server->socketClose = handleRemovePlayer;
auto clientIo = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
bool gameOver = false;
std::thread updateClientIo(
[clientIo, &gameOver] { updateClientIo(clientIo, gameOver); });
auto p = server.get();
handler = [p]{p->stop();};
auto ok = server->startAccept();
std::unique_ptr<std::thread> t;
std::unique_ptr<std::thread> io;
if(ok) {
std::cout << "ok\n";
server->newConn = handleAddNewPlayer;
t.reset(new std::thread([p]{p->waitingForAccept();}));
io.reset(new std::thread([p]{p->waitingForIo();}));
}
std::cout << "continue\n";
/*
auto client = buildClientSocket("192.168.1.200", 8901, 1, clientIo);
if(client) {
client->handleRecv =
[](const ServerSocket::pointer& p, const char* data, int size) {
p->sendMessage(std::string(data, data + size));
};
client->startRecv();
client->sendMessage("hello world");
std::cout << "\nclient connect to server\n";
}
*/
while(server->isRunning()) {
// in fact this should update main logic
Sleep(1);
}
std::cout << "ok and bye\n";
gameOver = true;
updateClientIo.join();
if(t && t->joinable()) {
t->join();
}
if(io && io->joinable()) {
io->join();
}
CloseHandle(clientIo);
players.clear();
WSACleanup();
return 0;
}