UNP卷一chapter30 客户与服务器程序设计范式

本章主要讲述9个不同的服务器程序设计范式。重点在于讲述服务器开僻进程池以及线程池

i、预先派生子进程是让服务器在启动阶段调用fork创建一个子进程池。每个客户请求由当前可用子进程池中的某个(闲置)子进程处理。

ii、预先创建线程是让服务器在启动阶段创建一个线程池,每个客户由当前可用线程池中的某个(闲置)线程处理。

回忆之前所学的客户/服务器程序代码编写,此处将详细罗列各种tcp客户/服务器设计范例(特别需要重点突出进程池和线程池服务器端的代码编写)。

1、tcp服务器程序设计范例

以下均为举例程序

i、迭代服务器程序

#include	"unp.h"
#include	<time.h>

int
main(int argc, char **argv)
{
	int					listenfd, connfd;
	struct sockaddr_in	servaddr;
	char				buff[MAXLINE];
	time_t				ticks;

	listenfd = Socket(AF_INET, SOCK_STREAM, 0);

	bzero(&servaddr, sizeof(servaddr));
	servaddr.sin_family      = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port        = htons(13);	/* daytime server */

	Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));

	Listen(listenfd, LISTENQ);

	for ( ; ; ) {
		connfd = Accept(listenfd, (SA *) NULL, NULL);

        ticks = time(NULL);
        snprintf(buff, sizeof(buff), "%.24s\r\n", ctime(&ticks));
        Write(connfd, buff, strlen(buff));

		Close(connfd);
	}
}
ii、并发服务器程序,每个客户一个子进程
#include	"unp.h"

//相关函数声明
int tcp_listen(const char* host, const char* serv, socklen_t *addrlenp);//此函数参考p258,由于篇幅,不再给出
void sig_int(int signo);
void sig_chld(int signo);

int
main(int argc, char **argv)
{
	int					listenfd, connfd;
	pid_t				childpid;
	void				sig_chld(int), sig_int(int), web_child(int);
	socklen_t			clilen, addrlen;
	struct sockaddr		*cliaddr;

	if (argc == 2)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 3)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: serv01 [ <host> ] <port#>");
	cliaddr = Malloc(addrlen);

	Signal(SIGCHLD, sig_chld);
	Signal(SIGINT, sig_int);

	for (; ; ) {
		clilen = addrlen;
		if ((connfd = accept(listenfd, cliaddr, &clilen)) < 0) {
			if (errno == EINTR)
				continue;		/* back to for() */
			else
				err_sys("accept error");
		}

		if ((childpid = Fork()) == 0) {	/* child process */
			Close(listenfd);	/* close listening socket */
			web_child(connfd);	/* process request */
			exit(0);
		}
		Close(connfd);			/* parent closes connected socket */
	}
}

/* include sigint */
void
sig_int(int signo)
{
	void	pr_cpu_time(void);

	pr_cpu_time();//此函数参考P652,由于篇幅原因,此处不再给出
	exit(0);
}
/* include sigchld */
void
sig_chld(int signo) {
	pid_t pid;
	int stat;
	while ((pid = waitpid(-1, &stat, WNOHANG)) > 0);
	return;
}

iii、预先派生子进程服务器程序,accept无上锁保护(引入进程池概念啦!)

惊群现象:只有一个子进程获得连接,但其他所有子进程都被唤醒;(此现象会导致性能受损)

#include	"unp.h"

static int		nchildren;
static pid_t	*pids;

//相关函数声明
int tcp_listen(const char* host, const char* serv, socklen_t *addrlenp);//此函数参考p258,由于篇幅,不再给出
pid_t child_make(int i, int listenfd, int addrlen);
void child_main(int i, int listenfd, int addrlen);
void sig_int(int signo);


int
main(int argc, char **argv)
{
	int			listenfd, i;
	socklen_t	addrlen;
	void		sig_int(int);
	pid_t		child_make(int, int, int);

	if (argc == 3)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 4)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: serv02 [ <host> ] <port#> <#children>");
	nchildren = atoi(argv[argc - 1]);//指派预先派生的子进程个数
	pids = Calloc(nchildren, sizeof(pid_t));//分配一个存放各子进程ID的数组

	for (i = 0; i < nchildren; i++)
		pids[i] = child_make(i, listenfd, addrlen);	/* parent returns */
	                                               //创建一个存放各个子进程,并在每个子进程中accept。
	Signal(SIGINT, sig_int);

	for (; ; )
		pause();	/* everything done by children */
}
/* include sigint */
void
sig_int(int signo)
{
	int		i;
	void	pr_cpu_time(void);

	/* 4terminate all children */
	for (i = 0; i < nchildren; i++)
		kill(pids[i], SIGTERM);//杀死相应进程
	while (wait(NULL) > 0)		/* wait for all children */
		;
	if (errno != ECHILD)
		err_sys("wait error");

	pr_cpu_time();//此函数参考P652,由于篇幅原因,此处不再给出
	exit(0);
}
/* end sigint */
/* include child_make */
pid_t
child_make(int i, int listenfd, int addrlen)
{
	pid_t	pid;
	void	child_main(int, int, int);

	if ((pid = Fork()) > 0)//只有父进程返回
		return(pid);		/* parent */

	child_main(i, listenfd, addrlen);	/* never returns */
}                                      //子进程不返回,用来监听accept等
/* end child_make */

/* include child_main */
void
child_main(int i, int listenfd, int addrlen)
{
	int				connfd;
	void			web_child(int);
	socklen_t		clilen;
	struct sockaddr	*cliaddr;

	cliaddr = Malloc(addrlen);

	printf("child %ld starting\n", (long)getpid());
	for (; ; ) {
		clilen = addrlen;
		connfd = Accept(listenfd, cliaddr, &clilen);

		web_child(connfd);		/* process the request */
		Close(connfd);
	}
}
/* end child_main */

iv、预先派生子进程服务器程序,accept使用文件上锁保护

让应用进程在调用accept前后安置某种形式的锁(lock),这样任意时刻只有一个子进程阻塞在accept调用中,其他子进程则阻塞在试图获取用于保护accpet的锁上。(此处代码参考P659)

v、预先派生子进程服务器程序,accept使用线程上锁保护

在不同进程之间使用线程上锁要求:(1)互斥锁变量必须存放在由所有进程共享的内存区中;(2)必须告知线程函数库这是在不同进程之间共享的互斥锁。(此处代码参考P662)

vi、预先派生子进程服务器程序,传递描述符

#include	"unp.h"

static int		nchildren;
static pid_t	*pids;

//相关函数声明
int tcp_listen(const char* host, const char* serv, socklen_t *addrlenp);//此函数参考p258,由于篇幅,不再给出
pid_t child_make(int i, int listenfd, int addrlen);
void child_main(int i, int listenfd, int addrlen);
void sig_int(int signo);
void my_lock_init(char* pathname);
void my_lock_wait();
void my_lock_release();


int
main(int argc, char **argv)
{
	int			listenfd, i;
	socklen_t	addrlen;
	void		sig_int(int);
	pid_t		child_make(int, int, int);

	if (argc == 3)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 4)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: serv02 [ <host> ] <port#> <#children>");
	nchildren = atoi(argv[argc - 1]);//指派预先派生的子进程个数
	pids = Calloc(nchildren, sizeof(pid_t));//分配一个存放各子进程ID的数组

	/*1、文件上锁*/
	my_lock_init("/tmp/lock.xxxx");/*one lock file for all children*/
	for (i = 0; i < nchildren; i++)
		pids[i] = child_make(i, listenfd, addrlen);	/* parent returns */
													//创建一个存放各个子进程,并在每个子进程中accept。
	Signal(SIGINT, sig_int);

	for (; ; )
		pause();	/* everything done by children */
}
/* include sigint */
void
sig_int(int signo)
{
	int		i;
	void	pr_cpu_time(void);

	/* 4terminate all children */
	for (i = 0; i < nchildren; i++)
		kill(pids[i], SIGTERM);//杀死相应进程
	while (wait(NULL) > 0)		/* wait for all children */
		;
	if (errno != ECHILD)
		err_sys("wait error");

	pr_cpu_time();
	exit(0);
}
/* end sigint */
/* include child_make */
pid_t
child_make(int i, int listenfd, int addrlen)
{
	pid_t	pid;
	void	child_main(int, int, int);

	if ((pid = Fork()) > 0)//只有父进程返回
		return(pid);		/* parent */

	child_main(i, listenfd, addrlen);	/* never returns */
}                                      //子进程不返回,用来监听accept等
									   /* end child_make */

									   /* include child_main */
void
child_main(int i, int listenfd, int addrlen)
{
	int				connfd;
	void			web_child(int);
	socklen_t		clilen;
	struct sockaddr	*cliaddr;

	cliaddr = Malloc(addrlen);

	printf("child %ld starting\n", (long)getpid());
	for (; ; ) {
		clilen = addrlen;
		/*2、等待获取文件锁*/
		my_lock_wait();
		connfd = Accept(listenfd, cliaddr, &clilen);
		/*3、释放文件锁*/
		my_lock_release();

		web_child(connfd);		/* process the request */
		Close(connfd);
	}
}
/* end child_main */


/*以下是上锁、解锁、等待获取文件锁函数*/
static struct flock	lock_it, unlock_it;
static int			lock_fd = -1;
/* fcntl() will fail if my_lock_init() not called */

void
my_lock_init(char *pathname)
{
	char	lock_file[1024];

	/* 4must copy caller's string, in case it's a constant */
	strncpy(lock_file, pathname, sizeof(lock_file));
	lock_fd = mkstemp(lock_file);//创建一个具备该路径名的文件

	Unlink(lock_file);			/* but lock_fd remains open */

	/*用于上锁文件*/
	lock_it.l_type = F_WRLCK;
	lock_it.l_whence = SEEK_SET;
	lock_it.l_start = 0;
	lock_it.l_len = 0;

	/*用于解锁文件*/
	unlock_it.l_type = F_UNLCK;
	unlock_it.l_whence = SEEK_SET;
	unlock_it.l_start = 0;
	unlock_it.l_len = 0;
}
/* end my_lock_init */

/* include my_lock_wait */
void
my_lock_wait()
{
	int		rc;

	while ((rc = fcntl(lock_fd, F_SETLKW, &lock_it)) < 0) {
		if (errno == EINTR)
			continue;
		else
			err_sys("fcntl error for my_lock_wait");
	}
}

void
my_lock_release()
{
	if (fcntl(lock_fd, F_SETLKW, &unlock_it) < 0)
		err_sys("fcntl error for my_lock_release");
}

vii、并发服务器程序,每个客户一个线程

#include	"unpthread.h"

//相关函数声明
int tcp_listen(const char* host, const char* serv, socklen_t* addrlenp);
void sig_int(int signo);
void *doit(void *arg);

int
main(int argc, char **argv)
{
	int				listenfd, connfd;
	void			sig_int(int);
	void			*doit(void *);
	pthread_t		tid;
	socklen_t		clilen, addrlen;
	struct sockaddr	*cliaddr;

	if (argc == 2)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 3)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: serv06 [ <host> ] <port#>");
	cliaddr = Malloc(addrlen);

	Signal(SIGINT, sig_int);
	for (; ; ) {
		clilen = addrlen;
		connfd = Accept(listenfd, cliaddr, &clilen);

		Pthread_create(&tid, NULL, &doit, (void *)connfd);//每返回一个客户连接,就调用pthread_create创建一个新线程
	}
}

void *
doit(void *arg)
{
	void	web_child(int);

	Pthread_detach(pthread_self());//doitb函数先让自己脱离,使得主线程不必等待它
	web_child((int)arg);
	Close((int)arg);
	return(NULL);
}

void
sig_int(int signo)
{
	void	pr_cpu_time(void);

	pr_cpu_time();
	exit(0);
}

viii、预先创建线程服务器程序,每个线程各自accept分配一个线程池哈

#include<unp.h>
/*start 此部分可以写成一个头文件*/
typedef struct {
	pthread_t		thread_tid;		/* thread ID */
	long			thread_count;	/* # connections handled */
} Thread;
Thread	*tptr;		/* array of Thread structures; calloc'ed */

int				listenfd, nthreads;
socklen_t		addrlen;
pthread_mutex_t	mlock;
/*end 此部分可以写成一个头文件*/

/*start 服务器的主main部分*/
pthread_mutex_t	mlock = PTHREAD_MUTEX_INITIALIZER;//定义并初始化一个互斥锁

//相关函数声明
int tcp_listen(const char* host, const char* serv, socklen_t *addrlen);//此函数参考p258,由于篇幅,不再给出
void sig_int(int);
void thread_make(int);
//void thread_main(void* arg);

int
main(int argc, char **argv)
{
	int		i;
	void	sig_int(int), thread_make(int);

	if (argc == 3)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 4)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: serv07 [ <host> ] <port#> <#threads>");
	nthreads = atoi(argv[argc - 1]);
	tptr = Calloc(nthreads, sizeof(Thread));

	for (i = 0; i < nthreads; i++)
		thread_make(i);			/* only main thread returns */

	Signal(SIGINT, sig_int);
	for (; ; )
		pause();	/* everything done by threads */
}
/*end 服务器的主main部分*/

void
sig_int(int signo)
{
	int		i;
	void	pr_cpu_time(void);
	pr_cpu_time();
	for (i = 0; i < nthreads; i++)
		printf("thread %d, %ld connections\n", i, tptr[i].thread_count);
	exit(0);
}

void thread_make(int i) {
	void * thread_main(void*);
	pthread_create(&tptr[i].thread_count, NULL, &thread_main, (void*)i);
	return;//main thread returns
}

void* thread_main(void* arg) {
	int connfd;
	void web_child(int);
	socklen_t clilen;
	struct sockaddr *cliaddr;
	cliaddr = malloc(addrlen);

	printf("thread %d starting\n", (int)arg);
	for (;;) {
		clilen = addrlen;
		pthread_mutex_lock(&mlock);//上锁
		connfd = accept(listenfd, cliaddr, &clilen);//通过上锁,每个时候只有一个线程的accept处于阻塞之中
		pthread_mutex_unlock(&mlock);//解锁
		tptr[(int)arg].thread_count++;
		web_child(connfd);/*process request*/
		close(connfd);
	}
}

ix、预先创建线程服务器程序,主线程统一accept(分配一个线程池哈

#include<unp.h>
#include	"unpthread.h"

/*start 此部分可以写成一个头文件*/
typedef struct {
	pthread_t		thread_tid;		/* thread ID */
	long			thread_count;	/* # connections handled */
} Thread;
Thread	*tptr;		/* array of Thread structures; calloc'ed */

#define	MAXNCLI	32
int					clifd[MAXNCLI], iget, iput;//iget是线程池中某个线程将从该数组中取出的下一个元素的下标
                                               //iput是主线程将往该数组中存入的下一个元素的下标
                                               //当iput下标没有赶上iget下标(若赶上则说明该数组不够大),
                                               //并发送信号到条件变量信号 ,然后释放互斥锁,以允许线程池中某个线程为这个客户服务
pthread_mutex_t		clifd_mutex;
pthread_cond_t		clifd_cond;
/*end 此部分可以写成一个头文件*/


static int			nthreads;
pthread_mutex_t		clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t		clifd_cond = PTHREAD_COND_INITIALIZER;

//相关函数声明
void sig_int(int signo);
void thread_make(int i);
void * thread_main(void *arg);

int
main(int argc, char **argv)
{
	int			i, listenfd, connfd;
	void		sig_int(int), thread_make(int);
	socklen_t	addrlen, clilen;
	struct sockaddr	*cliaddr;

	if (argc == 3)
		listenfd = Tcp_listen(NULL, argv[1], &addrlen);
	else if (argc == 4)
		listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
	else
		err_quit("usage: serv08 [ <host> ] <port#> <#threads>");
	cliaddr = Malloc(addrlen);

	nthreads = atoi(argv[argc - 1]);
	tptr = Calloc(nthreads, sizeof(Thread));
	iget = iput = 0;

	/* 4create all the threads */
	for (i = 0; i < nthreads; i++)
		thread_make(i);		/* only main thread returns */

	Signal(SIGINT, sig_int);

	for (; ; ) {
		clilen = addrlen;
		connfd = Accept(listenfd, cliaddr, &clilen);

		Pthread_mutex_lock(&clifd_mutex);
		clifd[iput] = connfd;
		if (++iput == MAXNCLI)
			iput = 0;
		if (iput == iget)
			err_quit("iput = iget = %d", iput);
		Pthread_cond_signal(&clifd_cond);
		Pthread_mutex_unlock(&clifd_mutex);
	}
}

void
sig_int(int signo)
{
	int		i;
	void	pr_cpu_time(void);
	pr_cpu_time();
	for (i = 0; i < nthreads; i++)
		printf("thread %d, %ld connections\n", i, tptr[i].thread_count);
	exit(0);
}

void
thread_make(int i)
{
	void	*thread_main(void *);

	Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *)i);
	return;		/* main thread returns */
}

void *
thread_main(void *arg)
{
	int		connfd;
	void	web_child(int);

	printf("thread %d starting\n", (int)arg);
	for (; ; ) {
		Pthread_mutex_lock(&clifd_mutex);
		while (iget == iput)
			Pthread_cond_wait(&clifd_cond, &clifd_mutex);
		connfd = clifd[iget];	/* connected socket to service */
		if (++iget == MAXNCLI)
			iget = 0;
		Pthread_mutex_unlock(&clifd_mutex);
		tptr[(int)arg].thread_count++;

		web_child(connfd);		/* process request */
		Close(connfd);
	}
}

小结上述范式特点:

i、当系统负载较轻时,每来一个客户请求现场派生一个子进程为之服务的传统并发服务器即可;

ii、相比传统的每个客户fork一个设计范式,预先创建一个子进程池或一个线程池的设计范式能够把进程控制CPU时间降低10倍以上;(但此处没有给出,监视闲置子进程个数,随着所服务客户数的动态变化而增加或减少)

iii、让所有子进程或线程自行调用accept通常比父进程或主线程独自调用accept并把描述符传递给子进程或线程来得简单而快速;

iv、由于潜在select冲突的原因,让所有子进程或线程阻塞在同一个accept调用中比让它们阻塞在同一个select调用中更可取;

v、使用线程通常远快于使用进程。

2、tcp客户程序设计范例

main函数代码如下,

#include	"unp.h"
void str_cli(FILE *fp, int sockfd);

int
main(int argc, char **argv)
{
	int					sockfd;
	struct sockaddr_in	servaddr;

	if (argc != 2)
		err_quit("usage: tcpcli <IPaddress>");

	sockfd = Socket(AF_INET, SOCK_STREAM, 0);

	bzero(&servaddr, sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_port = htons(SERV_PORT);
	Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);

	Connect(sockfd, (SA *)&servaddr, sizeof(servaddr));

	str_cli(stdin, sockfd);		/* do it all */

	exit(0);
}

i、基本的tcp客户程序(书上P100)(存在的问题:首先,进程在被阻塞以等待用户输入期间,看不到诸如对端关闭连接等网络事件。其次,以停等模式运作,批处理效率极低。)

void
str_cli(FILE *fp, int sockfd)
{
	char	sendline[MAXLINE], recvline[MAXLINE];

	while (Fgets(sendline, MAXLINE, fp) != NULL) {

		Writen(sockfd, sendline, strlen(sendline));

		if (Readline(sockfd, recvline, MAXLINE) == 0)
			err_quit("str_cli: server terminated prematurely");

		Fputs(recvline, stdout);
	}
}
ii、迭代客户程序(书上P137)
void
str_cli(FILE *fp, int sockfd)
{
	int			maxfdp1, stdineof;
	fd_set		rset;
	char		buf[MAXLINE];
	int		n;

	stdineof = 0;
	FD_ZERO(&rset);
	for ( ; ; ) {
		if (stdineof == 0)
			FD_SET(fileno(fp), &rset);
		FD_SET(sockfd, &rset);
		maxfdp1 = max(fileno(fp), sockfd) + 1;
		Select(maxfdp1, &rset, NULL, NULL, NULL);

		if (FD_ISSET(sockfd, &rset)) {	/* socket is readable */
			if ( (n = Read(sockfd, buf, MAXLINE)) == 0) {
				if (stdineof == 1)
					return;		/* normal termination */
				else
					err_quit("str_cli: server terminated prematurely");
			}

			Write(fileno(stdout), buf, n);
		}

		if (FD_ISSET(fileno(fp), &rset)) {  /* input is readable */
			if ( (n = Read(fileno(fp), buf, MAXLINE)) == 0) {
				stdineof = 1;
				Shutdown(sockfd, SHUT_WR);	/* send FIN */
				FD_CLR(fileno(fp), &rset);
				continue;
			}

			Writen(sockfd, buf, n);
		}
	}
}

iii、使用非阻塞式I/O实现客户程序(参考书上P343)

iv、使用fork的str_cli函数

void
str_cli(FILE *fp, int sockfd)
{
	pid_t	pid;
	char	sendline[MAXLINE], recvline[MAXLINE];

	if ( (pid = Fork()) == 0) {		/* child: server -> stdout */
		while (Readline(sockfd, recvline, MAXLINE) > 0)
			Fputs(recvline, stdout);

		kill(getppid(), SIGTERM);	/* in case parent still running */
		exit(0);
	}

		/* parent: stdin -> server */
	while (Fgets(sendline, MAXLINE, fp) != NULL)
		Writen(sockfd, sendline, strlen(sendline));

	Shutdown(sockfd, SHUT_WR);	/* EOF on stdin, send FIN */
	pause();
	return;
}
v、使用线程的str_cli函数
void	*copyto(void *);

static int	sockfd;		/* global for both threads to access */
static FILE	*fp;

void
str_cli(FILE *fp_arg, int sockfd_arg)
{
	char		recvline[MAXLINE];
	pthread_t	tid;

	sockfd = sockfd_arg;	/* copy arguments to externals */
	fp = fp_arg;

	Pthread_create(&tid, NULL, copyto, NULL);

	while (Readline(sockfd, recvline, MAXLINE) > 0)
		Fputs(recvline, stdout);
}

void *
copyto(void *arg)
{
	char	sendline[MAXLINE];

	while (Fgets(sendline, MAXLINE, fp) != NULL)
		Writen(sockfd, sendline, strlen(sendline));

	Shutdown(sockfd, SHUT_WR);	/* EOF on stdin, send FIN */

	return(NULL);
		/* 4return (i.e., thread terminates) when EOF on stdin */
}

以上知识点来均来自steven先生所著UNP卷一(version3),刚开始学习网络编程,如有不正确之处请大家多多指正。

猜你喜欢

转载自blog.csdn.net/tt_love9527/article/details/80779111