在windows上绕了一大圈之后终于又回到linux,主要就说下poll和epoll。从这两个模型上可以看出,linux始终都没有采取将内核的收到数据直接拷贝到用户缓冲区,epoll用了更加灵活的办法:mmap,这么做可能是为了让开发人员去处理传输过程中的问题,比如像传输缓慢之类的处理策略。poll和select用起来非常相似,但select每次循环都要清空重设内核对象,poll通过标识的办法避免了每次的清空重设,但没有解决的是,仍然需要对整个pollfd集合进行遍历来判断获取到需要处理的序号,不过可以和select的代码对比下,单线程部分的清空重设那的遍历已经没了,剩下遍历那部分,可以在多线程中完成,也就是说很大程度上已经减轻了select中遍历造成的性能损失。还有就是poll没什么限制,可以比select同时监听更多的socket
#include <iostream> #include<list> #include<vector> #include<mutex> #include<cmath> #include<condition_variable> #include<sys/socket.h> #include<netinet/in.h> #include <pthread.h> #include<poll.h> #include<unistd.h> #include<memory.h> using namespace std; struct Client { int fd; int id; }; void* CreateServ(void* args); void* Proc(void* args); using namespace std; const int _thread_count = 8; pollfd* _pollfd; list<Client*> _sockList; list<int> _removeList; mutex lock4cv; mutex lock4cv2; condition_variable cv; condition_variable cv2; int _thread_unfinish; vector<int> _vec; char* buf2 = "hello client"; int main() { pthread_t tid; for (int i = 0; i < _thread_count; i++) { _vec.push_back(0); } pthread_create(&tid, 0, CreateServ, 0); for (int i = 0; i < _thread_count; i++) { int* temp = new int(i); pthread_create(&tid, 0, Proc, temp); } cin.get(); cin.get(); return 0; } bool _isFinish() { return _thread_unfinish <= 0; } /* * 在这采用的线程均分策略,其实并不能做到真正意义上的负载均衡,但由于poll每次都要处理整个集合,所以这还是就先采用这种模式。下篇用epoll会写成队列争抢模式 */ void* Proc(void* args) { int* I = (int*)args; while (true) { { unique_lock<mutex> l(lock4cv); if (_vec[*I] <= 0) { cv.wait(l); } _vec.at(*I) = 0; }//让l释放lock4cv int start = ceil(_sockList.size() / (double)_thread_count)* *I; if (start < *I) { start = *I; } if (_sockList.size() <= start) { lock4cv2.lock(); _thread_unfinish -= 1; cv2.notify_all(); lock4cv2.unlock(); continue; } int end = start + ceil(_sockList.size() / (double)_thread_count); int i = 0; for (auto iter = _sockList.begin(); iter != _sockList.end(); ++iter,i++) { if (i >= start&&i < end) { Client * & c = *iter; if (_pollfd[i].revents == POLLIN) { cout << "proc by:" << *I << endl; char buf[128]; int len = recv(c->fd, buf, 128, 0); if (len <= 0) { close(c->fd); cout << i << endl; _removeList.push_back(i); continue; } send(c->fd, buf2, 128, 0); //cout << buf << endl; } } } lock4cv2.lock(); _thread_unfinish -= 1; cv2.notify_all(); lock4cv2.unlock(); } } void release(int index) { cout << "release" << endl; int last = _sockList.size() - 1; if (index < last) { Client* c; for (auto iter = _sockList.begin(); iter != _sockList.end(); ++iter) { if ((*iter)->id == index) { c = *iter; *iter = _sockList.back(); (*iter)->id = index; break; } } _sockList.pop_back(); delete c; _pollfd[index] = _pollfd[last]; memset(&_pollfd[last], 0, sizeof(pollfd)); } else { delete _sockList.back(); _sockList.pop_back(); memset(&_pollfd[index], 0, sizeof(pollfd)); } } void* CreateServ(void* args) { int sockSrv = socket(AF_INET, SOCK_STREAM, 0); int nSendBufLen = 16 * 1024 * 1024; setsockopt(sockSrv, SOL_SOCKET, SO_SNDBUF, (const char*)&nSendBufLen, sizeof(int)); // struct in_addr s; // inet_pton(AF_INET, "127.0.0.1",(void*)&s); sockaddr_in addrSrv; addrSrv.sin_addr.s_addr = htonl(INADDR_ANY); addrSrv.sin_family = AF_INET; addrSrv.sin_port = htons(6001); ::bind(sockSrv, (sockaddr*)&addrSrv, sizeof(sockaddr)); int err = listen(sockSrv, 100); if (err == -1) { cout << "listen failed" << endl; return 0; } int base = 8; pollfd* pfd4poll; pfd4poll = new pollfd[base]; _pollfd = pfd4poll + 1; pfd4poll[0].fd = sockSrv; pfd4poll[0].events = POLLIN; //accept loop int i = 0; while (true) { if (base <= _sockList.size() + 1) { base *= 2; pfd4poll = (pollfd*)realloc(pfd4poll, sizeof(pollfd)*base); _pollfd = pfd4poll + 1; cout << "realloc" << endl; } poll(pfd4poll, _sockList.size() + 1, -1); if (pfd4poll[0].revents == POLLIN) { int s = accept(sockSrv, 0, 0); Client* c = new Client; c->fd = s; c->id = i; _pollfd[i].fd = s; _pollfd[i].events = POLLIN; _sockList.push_back(c); i++; } else { _thread_unfinish = _thread_count; for (int i = 0; i < _thread_count; i++) { _vec.at(i) = 1; } cv.notify_all(); unique_lock<mutex> l(lock4cv2); cv2.wait(l, _isFinish); _removeList.sort([](const int a1, const int a2) { return a1 > a2; }); for (auto iter = _removeList.begin(); iter != _removeList.end(); ++iter) { release(*iter); i--; } _removeList.clear(); } } }