一个简单的进程池

 

//半同步/半异步进程池的实现
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <stdbool.h>
#include <time.h>

#define PROCESS_NUM    5
/*进程池允许的最大子进程数量*/
static const int MAX_PROCESS_NUMBER = 16;
/*每个子进程最多能处理的客户端数量*/
static const int USER_PER_PROCESS = 65536;
/*epoll 最多能处理的时间数*/
static const int MAX_EVENT_NUMBER = 1024;
/*用于处理信号的管道,以实现统一事件源。后面称之为信号管道*/
static int sig_pipefd[2];

/*描述一个子进程的结构*/
typedef struct _process_
{
	/*子进程通过m_stop来决定是否停止*/
	int m_stop;
	/*子进程的编号,从0开始,父进程为-1*/
	int m_idx;
	/*每个进程都有一个epoll内核事件表,用m_epollfd标识*/
	int m_epollfd;
	pid_t m_pid;//子进程PID
	int m_pipefd[2];//父进程和子进程通信用的管道
}process;

typedef struct _process_pool_
{
	/*进程池中的进程总数*/
	int m_process_num;
	/*监听fd*/
	int m_listenfd;
	/*保存所有进程的描述信息*/
	bool m_stop;
	process *m_sub_process;
	
}process_pool;



static int setnonblocking(int fd)
{
	int old_option = fcntl(fd, F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd, F_SETFL, new_option);
	return old_option;
} 


static void addfd(int epollfd, int fd)
{
	struct epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN | EPOLLET;
	epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
	setnonblocking(fd);
}

static void removefd(int epollfd, int fd)
{
	epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
	close(fd);
}

//当产生信号时,通过信号管道往父进程发送该信号
static void sig_handler(int sig)
{
	int save_errno = errno;
	int msg = sig;
	send(sig_pipefd[1], (char *)&msg, 1 ,0); 
	errno = save_errno;
}

static void addsig(int sig, void(handler)(int), bool restart)
{
	struct sigaction sa;
	memset(&sa, '\0', sizeof(sa));
	sa.sa_handler = handler;
	if(restart)
	{
		sa.sa_flags |= SA_RESTART;
	}
	sigfillset(&sa.sa_mask);
	if(sigaction(sig, &sa, NULL) == -1)
	{
		printf("sigcation errno!\n");
	}
}

//统一事件源
int setup_sig_pipe(process *pro) 
{
	int ret; 
	
	pro->m_epollfd = epoll_create(5);
	if(pro->m_epollfd < 0)
	{
		printf("epoll_create error!\n");
		return -1;
	}
	ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
	if(ret < 0)
	{
		printf("socketpair error \n");
		return -1;
	}
	setnonblocking(sig_pipefd[1]);
	addfd(pro->m_epollfd, sig_pipefd[0]);
	
	addsig(SIGCHLD, sig_handler, true);
	addsig(SIGTERM, sig_handler, true);
	addsig(SIGINT, sig_handler, true);
	addsig(SIGPIPE, SIG_IGN, true);
	
	return 0;
}



int worker_process_func(int clientfd)
{  
	char buff[256];
	char sendbuf[1024];
	int sendlen = 0;
    bool bError = false;
	int nRecv = 0, count = 0;
	
	//printf("in child pid= [%ld]\n",(long)getpid());
   
    memset(buff, 0, sizeof(buff));
	nRecv = recv(clientfd, buff, 256, 0);
	if (nRecv < 0)
	{
		if (errno != EAGAIN)//EWOULDBLOCK 用于非阻塞模式,不需要重新读或者写.EAGAIN 非阻塞模式下重新读
		{
			return -1;
		}
	}
	//对端关闭了socket,这端也关闭。
	else if (nRecv == 0)
	{
		printf("peer closed, client disconnected, fd = %d \n" ,clientfd);
		return -1;
	}
    
	printf("client msg:[%d] %s\n", count, buff);
 
    //将消息加上时间标签后发回
    time_t now = time(NULL);
    struct tm* nowstr = localtime(&now);

	memset(sendbuf, 0, sizeof(sendbuf));
	sprintf(sendbuf, "%04d-%02d-%02d %02d:%02d:%02d ", nowstr->tm_year + 1900, nowstr->tm_mon + 1, nowstr->tm_mday,	
	nowstr->tm_hour, nowstr->tm_min, nowstr->tm_sec);
    strcat(sendbuf + strlen(sendbuf), "server reply:");
	strcat(sendbuf, buff);
    sendlen  = strlen(sendbuf);
	int sendcount = 0;
    int nSent = 0;
	do
	{
		nSent = send(clientfd , sendbuf + sendcount, sendlen - sendcount, 0);
		if (nSent == -1)
		{
			if (errno == EWOULDBLOCK)
			{
				sleep(10);
				continue;
			}
			else
			{
				printf("send error, fd = [%d] \n", clientfd);
				close(clientfd);
				break;
			}   
		}          
		sendcount += nSent;
	}while(sendcount < sendlen);
		
    printf("send [%d]:%s\n", sendcount, sendbuf);
 
    return 0;
}

int  create_server_listener(const char* ip, short port)
{
	int g_listenfd;
    g_listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (g_listenfd == -1)
        return -1;

    int on = 1;
    setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
    setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));
 
    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr)); 
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    if (bind(g_listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1)
        return -1;

    if (listen(g_listenfd, 50) == -1)
        return -1;
	
	return g_listenfd;
}


int run_child(process_pool *pools, int n)
{
	int pipefd, epollfd, number, ret;
	struct epoll_event events[USER_PER_PROCESS];
	
	printf("run child [%d]!\n", getpid());
	setup_sig_pipe(&pools->m_sub_process[n]);
	pipefd = pools->m_sub_process[n].m_pipefd[1];
	epollfd = pools->m_sub_process[n].m_epollfd;
	addfd(epollfd, pipefd);
	
	while(!pools->m_sub_process[n].m_stop)
	{
		number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
		if((number < 0) && (errno != EINTR))
		{
			printf("child epoll errno , %s !\n", strerror(errno));
			break;
		}
		
		for(int i = 0; i < number; i++)
		{
			int sockfd = events[i].data.fd;
			if((sockfd == pipefd) && (events[i].events & EPOLLIN))
			{//父进程通知,接收新的客户端连接
				int client = 0;
				/*从父子进程之间的管道中读取数据并将结果保存着变量client中,如果读取成功
				表示有新的客户端连接到来*/
				printf("new client!ret = [%d] \n", ret);
				ret = recv(sockfd, (char *)&client, sizeof(client), 0);
				if(((ret < 0) && (errno != EAGAIN)) || ret == 0)
				{
					continue;
				}
				else 
				{
					struct sockaddr_in client_addr;
					socklen_t client_len = sizeof(struct sockaddr_in);
					int connfd = accept(pools->m_listenfd,(struct sockaddr*)&client_addr, &client_len);
					if(connfd < 0)
					{
						printf("child accept errno , %s !\n", strerror(errno));
						continue;
					}
					printf("new client from [%s] [%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
					addfd(epollfd, connfd);
				}
			}
			else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
			{//有信号产生
				int sig;
				char signals[1024];
				
				ret = recv(sockfd, signals, 1024, 0);
				if(ret <= 0)
				{
					continue;
				}
				else
				{
					for(int i= 0; i < ret; i++)
					{
						switch(signals[i])
						{
							case SIGCHLD:
							{
								pid_t pid;
								int stat;
								while((pid = waitpid(-1, &stat, WNOHANG)) > 0)
								{
									continue;
								}
								break;
							}
							case SIGTERM:
							case SIGINT:
							{
								pools->m_sub_process[n].m_stop = true;
								pools->m_sub_process[n].m_pid = -1;
								break;
							}
							default:
								break;
						}
					}
				}
			}
			else if(events[i].events & EPOLLIN)
			{//有数据要来
				if(worker_process_func(sockfd) < 0)
				{
					removefd(epollfd, sockfd);
				}
			}
			else 
			{
				continue;
			}
		}
	}
	close(pipefd);
	close(epollfd);
	exit(0);
}

int run_parent(process_pool *pools)
{
	printf("run parent [%d]!\n", getpid());
	int pipefd, epollfd, number;
	int ret;
	int new_conn = 1, sub_process_counter = 0;
	struct epoll_event events[USER_PER_PROCESS];
	
	epollfd = epoll_create(5);
	if(epollfd < 0)
	{
		printf(" parent epoll_create error!\n");
		return -1;
	}
	ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
	if(ret < 0)
	{
		printf("socketpair error \n");
		return -1;
	}
	setnonblocking(sig_pipefd[1]);
	addfd(epollfd, sig_pipefd[0]);
	
	addsig(SIGCHLD, sig_handler, true);
	addsig(SIGTERM, sig_handler, true);
	addsig(SIGINT, sig_handler, true);
	addsig(SIGPIPE, SIG_IGN, true);
	
	addfd(epollfd, pools->m_listenfd);
	while(!pools->m_stop)
	{
		number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
		if((number < 0) && (errno != EINTR))
		{
			printf("parent epoll errno , %s !\n", strerror(errno));
			break;
		}
		for(int i = 0; i < number; i++)
		{
			int sockfd = events[i].data.fd;
			if(sockfd == pools->m_listenfd)
			{
				/*如果有新的连接到来,就采用Round Robin 方式将其分配给一个子进程处理*/
				int num = sub_process_counter;
				do
				{//检查正常的子进程
					if(pools->m_sub_process[num].m_pid != -1)
					{
						break;
					}
					num = (num + 1) % pools->m_process_num;
				}while(num != pools->m_process_num);
				
				if(pools->m_sub_process[num].m_pid == -1)
				{
					pools->m_sub_process[num].m_stop = true;
					break;
				}
				sub_process_counter = (num + 1) % pools->m_process_num;
				send(pools->m_sub_process[num].m_pipefd[0], (char*)&new_conn, sizeof(new_conn), 0);
				printf("send request to child %d\n", num);
			}
			else if((sockfd == sig_pipefd[0]) && (events[i].events &EPOLLIN))
			{
				int sig;
				char signals[1024];
				ret = recv(sig_pipefd[0], signals, 1024, 0);
				if(ret <= 0)
				{
					continue;
				}
				else
				{
					printf("parent recv signals = [%d][%s]\n", ret, signals);
					for(int i= 0; i < ret; i++)
					{
						switch(signals[i])
						{
							case SIGCHLD:
							{
								pid_t pid;
								int stat;
								while((pid = waitpid(-1, &stat, WNOHANG)) > 0)
								{
									for(int j = 0; j < pools->m_process_num; j++)
									{
										/*如果进程池中的第j个子进程退出了,
										应关闭响应的通信管道,并设置相应的m_pid为-1,
										以标记该子进程已经退出*/
										if(pools->m_sub_process[j].m_pid == pid)
										{
											printf("child %d join\n", j);
											close(pools->m_sub_process[j].m_pipefd[0]);
											pools->m_sub_process[j].m_pid = -1;
											pools->m_sub_process[j].m_stop = true;
										}
									}
									
								}
								/*如果所有子进程都已经退出了,则父进程也退出*/
								pools->m_sub_process[i].m_stop = true;
								
								for(int i = 0; i < pools->m_process_num; i++)
								{
									if(pools->m_sub_process[i].m_pid != -1)
									{
										pools->m_sub_process[i].m_stop = true;
									}
								}
								break;
							}
							case SIGTERM:
							case SIGINT:
							{
								/*如果父进程接收到终止信号,那么就杀死所有子进程
								并等待他们全部结束。当然通知子进程结束更好的方法是
								向父子进程之间的通信管道发送特殊数据,可以自己实现*/
								printf("kill all the child now\n");
								for(int i = 0; i < pools->m_process_num; i++)
								{
									int pid = pools->m_sub_process[i].m_pid;
									if(pid != -1)
									{
										if(kill(pid, SIGTERM) == 0)
											printf("kill child [%d]\n", pid);
									}
									
								}
								pools->m_stop = true;
								break;
							}
							default:
								break;
						}
					}
				}
			}
			else 
			{
				continue;
			}
		}
	}
	close(pools->m_listenfd);
	close(epollfd);
}

int process_pool_init(int listenfd, int process_num, process_pool *pools)
{
	int i, ret;
	if(process_num <= 0)
	{
		process_num = PROCESS_NUM;
	}
	
	pools->m_listenfd = listenfd;
	pools->m_sub_process = (process*)malloc(sizeof(process) * PROCESS_NUM);
	if(pools->m_sub_process == NULL)
	{
		printf("malloc error!\n");
		return -1;
	}
	memset(pools->m_sub_process, 0, sizeof(process) * PROCESS_NUM);
	pools->m_process_num = process_num;
	
	for(i = 0; i < process_num; i++)
	{
		ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pools->m_sub_process[i].m_pipefd);
		if(ret != 0)
		{
			printf("socketpair error!\n");
			return -1;
		}
		pools->m_sub_process[i].m_pid = fork();
		if(pools->m_sub_process[i].m_pid < 0)
		{
			printf("fork error!\n");
			return -1;
		}
		else if(pools->m_sub_process[i].m_pid > 0)
		{
			close(pools->m_sub_process[i].m_pipefd[1]);
			printf("create child [%d]\n", pools->m_sub_process[i].m_pid);
		}
		else
		{
			pools->m_sub_process[i].m_pid = getpid();
			close(pools->m_sub_process[i].m_pipefd[0]);
			pools->m_sub_process[i].m_idx = i;
			pools->m_sub_process[i].m_stop = false;
			run_child(pools, i);
		}
	}
	return 0;
}

int main()
{
	char *ip = "127.0.0.1";
	short port = 10000;
	int listenfd, ret, i;
	process_pool pools;
	
	listenfd = create_server_listener(ip, port);
	if(listenfd < 0)
	{
		printf("create_server_listener error!\n");
		return -1;
	}
	
	ret = process_pool_init(listenfd, 0, &pools);
	if(ret < 0)
	{
		printf("process_pool_init error!\n");
		return -1;
	}
	pools.m_stop = false;
	run_parent(&pools);
	if(pools.m_sub_process != NULL)
	{
		free(pools.m_sub_process);
		pools.m_sub_process = NULL;
	}
	return 0;
}

参考:《linux 高性能服务器编程》

猜你喜欢

转载自blog.csdn.net/u014608280/article/details/86003250