C++ SOCKET通信模型(五)poll

在windows上绕了一大圈之后终于又回到linux,主要就说下poll和epoll。从这两个模型上可以看出,linux始终都没有采取将内核的收到数据直接拷贝到用户缓冲区,epoll用了更加灵活的办法:mmap,这么做可能是为了让开发人员去处理传输过程中的问题,比如像传输缓慢之类的处理策略。poll和select用起来非常相似,但select每次循环都要清空重设内核对象,poll通过标识的办法避免了每次的清空重设,但没有解决的是,仍然需要对整个pollfd集合进行遍历来判断获取到需要处理的序号,不过可以和select的代码对比下,单线程部分的清空重设那的遍历已经没了,剩下遍历那部分,可以在多线程中完成,也就是说很大程度上已经减轻了select中遍历造成的性能损失。还有就是poll没什么限制,可以比select同时监听更多的socket

#include <iostream>
#include<list>
#include<vector>
#include<mutex>
#include<cmath>
#include<condition_variable>
#include<sys/socket.h>
#include<netinet/in.h>
#include <pthread.h>
#include<poll.h>
#include<unistd.h>
#include<memory.h>
using namespace std;
struct Client
{
	int fd;
	int id;
};

void* CreateServ(void* args);
void* Proc(void* args);
using namespace std;
const int _thread_count = 8;

pollfd* _pollfd;
list<Client*> _sockList;
list<int> _removeList;
mutex lock4cv;
mutex lock4cv2;
condition_variable cv;
condition_variable cv2;
int _thread_unfinish;
vector<int> _vec;
char* buf2 = "hello client";
int main()
{
	pthread_t tid;
	for (int i = 0; i < _thread_count; i++)
	{
		_vec.push_back(0);
	}
	pthread_create(&tid, 0, CreateServ, 0);
	for (int i = 0; i < _thread_count; i++)
	{
		int* temp = new int(i);
		pthread_create(&tid, 0, Proc, temp);
	}
	cin.get();
	cin.get();
	return 0;
}
bool _isFinish()
{
	return _thread_unfinish <= 0;
}

/*
 * 在这采用的线程均分策略,其实并不能做到真正意义上的负载均衡,但由于poll每次都要处理整个集合,所以这还是就先采用这种模式。下篇用epoll会写成队列争抢模式
 */
void* Proc(void* args)
{
	int* I = (int*)args;

	while (true)
	{
		{
			unique_lock<mutex> l(lock4cv);
			if (_vec[*I] <= 0)
			{
				cv.wait(l);
			}
			_vec.at(*I) = 0;
		}//让l释放lock4cv
		int start = ceil(_sockList.size() / (double)_thread_count)* *I;
		if (start < *I)
		{
			start = *I;
		}
		if (_sockList.size() <= start)
		{
			lock4cv2.lock();
			_thread_unfinish -= 1;
			cv2.notify_all();
			lock4cv2.unlock();
			continue;
		}
		int end = start + ceil(_sockList.size() / (double)_thread_count);

		int i = 0;
		for (auto iter = _sockList.begin(); iter != _sockList.end(); ++iter,i++)
		{
			if (i >= start&&i < end) {
				Client * & c = *iter;
				if (_pollfd[i].revents == POLLIN)
				{
					cout << "proc by:" << *I << endl;
					char buf[128];
					int len = recv(c->fd, buf, 128, 0);
					if (len <= 0)
					{
						close(c->fd);
						cout << i << endl;
						_removeList.push_back(i);
						continue;
					}
					send(c->fd, buf2, 128, 0);
					//cout << buf << endl;
				}
			}
		}
		lock4cv2.lock();
		_thread_unfinish -= 1;
		cv2.notify_all();
		lock4cv2.unlock();
	}
}
void release(int index)
{
	cout << "release" << endl;
	int last = _sockList.size() - 1;
	if (index < last)
	{
		Client* c;
		for (auto iter = _sockList.begin(); iter != _sockList.end(); ++iter)
		{
			if ((*iter)->id == index)
			{
				c = *iter;
				*iter = _sockList.back();
				(*iter)->id = index;
				break;
			}
		}
		_sockList.pop_back();
		delete c;
		_pollfd[index] = _pollfd[last];
		memset(&_pollfd[last], 0, sizeof(pollfd));
	}
	else {
		delete _sockList.back();
		_sockList.pop_back();
		memset(&_pollfd[index], 0, sizeof(pollfd));
	}
	
}

void* CreateServ(void* args) {
	int sockSrv = socket(AF_INET, SOCK_STREAM, 0);

	int nSendBufLen = 16 * 1024 * 1024;
	setsockopt(sockSrv, SOL_SOCKET, SO_SNDBUF, (const char*)&nSendBufLen, sizeof(int));

	//	struct in_addr s;
	//	inet_pton(AF_INET, "127.0.0.1",(void*)&s);
	sockaddr_in addrSrv;
	addrSrv.sin_addr.s_addr = htonl(INADDR_ANY);
	addrSrv.sin_family = AF_INET;
	addrSrv.sin_port = htons(6001);

	::bind(sockSrv, (sockaddr*)&addrSrv, sizeof(sockaddr));

	int err = listen(sockSrv, 100);
	if (err == -1) {
		cout << "listen failed" << endl;
		return 0;
	}

	int base = 8;
	
	pollfd* pfd4poll;
	pfd4poll = new pollfd[base];
	_pollfd = pfd4poll + 1;
	pfd4poll[0].fd = sockSrv;
	pfd4poll[0].events = POLLIN;
	//accept loop
	int i = 0;
	while (true) {
		
		if (base <= _sockList.size() + 1) {
			base *= 2;
			pfd4poll = (pollfd*)realloc(pfd4poll, sizeof(pollfd)*base);
			_pollfd = pfd4poll + 1;
			cout << "realloc" << endl;
		}

		poll(pfd4poll, _sockList.size() + 1, -1);
		
		if (pfd4poll[0].revents == POLLIN)
		{
			int s = accept(sockSrv, 0, 0);
			Client* c = new Client;
			c->fd = s;
			c->id = i;
			_pollfd[i].fd = s;
			_pollfd[i].events = POLLIN;
			_sockList.push_back(c);
			i++;
		}
		else {
			_thread_unfinish = _thread_count;

			for (int i = 0; i < _thread_count; i++)
			{
				_vec.at(i) = 1;
			}
			cv.notify_all();

			unique_lock<mutex> l(lock4cv2);
			cv2.wait(l, _isFinish);

			_removeList.sort([](const int a1, const int a2)
			{
				return a1 > a2;
			});
			for (auto iter = _removeList.begin(); iter != _removeList.end(); ++iter)
			{
				release(*iter);
				i--;
			}
			_removeList.clear();
		}
	}
}



猜你喜欢

转载自blog.csdn.net/nightwizard2030/article/details/77750452