并发模型第肆讲-pre threaded模型
前言
一鼓作气,讲完了prefork之后,接下来继续总结一波pre threaded模型(即线程池模型)。 这种模型其实和前面说的prefork在形式上很类似,只不过前者事先创建的是进程,而这里的pre threaded创建的是线程。 当然由于进程和线程之前的区别,比如说线程可以共享进程中的大部分资源,造成了其在具体的实现上有点小differences.
一、 pre threaded模型
1.1 介绍
这里面其实没啥好说的了,既然我们能够想到用进程池的方式来提供服务端的性能提升,为什么不能使用线程池呢?
同样的,和prefork相对应,pre threaded 模型也有两种具体的实现方式,一种是
(1)、各个子线程各自accept
(2)、主线程统一accept,。
下面分别介绍。
1.1.1 pre threaded 模型1
整个模型可以用上图来表示。
和prefork 的模型很类似,这里简要说一点:对于上图中的红色圈圈,其代表的是端口号,需要注意的是进程里的所有线程共用的端口都是一样的,但是套接字是不一样的(这个在前几篇文章中有过论述,这里就不多说了)。
1.1.2 pre threaded 模型2
同样的,一幅图搞定。
不要和我说看不懂哦。
1.2 代码实现部分
pre threaded 模型框架上和prefork 类似,不同的就是在具体的某些点上因为进程之间与线程之间通信方式的不同所造成的差异(进程之间的通信需要使用IPC, 线程之间一般可以共享全局数据,但是一般得上锁)。
1.2.1 pre threaded 模型1
服务器端main函数部分:
#include <pthread.h>
#include "com.h"
#include "lib.h"
#include "str_echo.h"
#include "childThread.h"
#define LISTENQ 1024
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; //初始化全局共享的互斥锁
int listenFd; //全局共享的监听描述符
int main(int argc, char**argv){
//connFd;
pid_t childPid;
socklen_t cliLen;
struct sockaddr_in cliAddr,servAddr;
//socket
if((listenFd = socket(AF_INET,SOCK_STREAM,0))<0){
printf("socket error\n");
exit(-1);
}
bzero(&servAddr,sizeof(servAddr));
servAddr.sin_family=AF_INET;
servAddr.sin_addr.s_addr=htonl(INADDR_ANY);
servAddr.sin_port=htons(SERV_PORT);
//bind
if(bind(listenFd,(struct sockaddr*)&servAddr,sizeof(servAddr))<0){
printf("bind error\n");
exit(-1);
}
//listen
if((listen(listenFd,LISTENQ)) <0){
printf("listen error\n");
exit(-1);
}
int numOfChild = atoi(argv[1]); //第一个参数为线程池数量
ChildThread childs[numOfChild];
for(int i = 0;i<numOfChild;++i){
MakeChildThread(i,childs); //创建子线程池
}
while(1){
pause();
}
return 0;
}
服务器端中子线程的部分:
#include "com.h"
#include "lib.h"
#include <pthread.h>
#include <stdio.h>
#include "str_echo.h"
#include "childThread.h"
extern int listenFd;
extern pthread_mutex_t mlock;
//子线程的任务函数
void* MainThread(void *arg){
struct sockaddr_in cliAddr;
int connFd = -1;
pthread_detach(pthread_self()); //线程脱离
ChildThread *pchild = (ChildThread*) arg;
int index = pchild->num;
printf("childThread %d starting\n",index);
while(1){
int cliLen = sizeof(cliAddr);
pthread_mutex_lock(&mlock); //加锁
if((connFd = accept(listenFd,(struct sockaddr*)&cliAddr,&cliLen))<0){
printf("accept error\n");
exit(-1);
}
pthread_mutex_unlock(&mlock); //解锁
pchild->thread_count++;
str_echo(connFd); //响应客户端请求
close(connFd); //关闭连接
}
}
//创建子线程池部分
void MakeChildThread(int i,ChildThread * childThread){
childThread[i].num = i;
int ret = pthread_create(&childThread[i].thread_tid,NULL,&MainThread,(void*) (&(childThread[i]))); //创建线程
if(ret != 0){
printf("Create pthread error!\n");
exit(0);
}
}
代码分析:简要分析一点点。 和prefork的模型1 很像每个子线程分别阻塞在accept上。 这里的实现使用了锁,但是按照前面所讨论的,有些系统在accept上已经实现了加锁过程,所以上面的可以改成无锁的版本。
1.2.2 pre threaded 模型2
服务器端代码主函数部分:
/**
* prethread 并发服务器2 - 主线程统一accept
* note: accept 是否需要互斥,还是内核已经实现了原子操作
* to do:
*/
#include <pthread.h>
#include "com.h"
#include "lib.h"
#include "str_echo.h"
#include "childThread.h"
#define LISTENQ 1024
int cliFd[MAXCLI];
int iget,iput;
pthread_mutex_t cliFdMutex = PTHREAD_MUTEX_INITIALIZER; //初始化全局共享的互斥锁
pthread_cond_t cliFdCond = PTHREAD_COND_INITIALIZER; //全局共享的条件变量
int listenFd; //全局共享的监听描述符
int main(int argc, char**argv){
//connFd;
pid_t childPid;
socklen_t cliLen;
int connFd = -1;
struct sockaddr_in cliAddr, servAddr;
//socket
if((listenFd = socket(AF_INET,SOCK_STREAM,0))<0){
printf("socket error\n");
exit(-1);
}
bzero(&servAddr,sizeof(servAddr));
servAddr.sin_family=AF_INET;
servAddr.sin_addr.s_addr=htonl(INADDR_ANY);
servAddr.sin_port=htons(SERV_PORT);
//bind
if(bind(listenFd,(struct sockaddr*)&servAddr,sizeof(servAddr))<0){
printf("bind error\n");
exit(-1);
}
//listen
if((listen(listenFd,LISTENQ)) <0){
printf("listen error\n");
exit(-1);
}
int numOfChild = atoi(argv[1]); //第一个参数为线程池数量
ChildThread childs[numOfChild];
iput = 0;
iget = 0;
for(int i = 0;i<numOfChild;++i){
MakeChildThread(i,childs);
}
while(1){
int cliLen = sizeof(cliAddr);
if((connFd = accept(listenFd,(struct sockaddr*)&cliAddr,&cliLen))<0){
printf("accept error\n");
exit(-1);
}
//以下部分为向已建立的连接互斥放入缓冲区中
pthread_mutex_lock(&cliFdMutex);
cliFd[iput]=connFd;
++iput;
if(iput == MAXCLI){
iput = 0;
}
if(iput == iget){
printf("iput == iget = %d\n",iput);
}
pthread_cond_signal(&cliFdCond); //条件变量唤醒
pthread_mutex_unlock(&cliFdMutex);
}
return 0;
}
子线程相关的部分代码:
#include "com.h"
#include "lib.h"
#include <pthread.h>
#include <stdio.h>
#include "str_echo.h"
#include "childThread.h"
extern int listenFd;
extern pthread_mutex_t mlock;
extern int iget;
extern int iput;
extern int cliFd[MAXCLI];
extern pthread_mutex_t cliFdMutex; //初始化全局共享的互斥锁
extern pthread_cond_t cliFdCond ; //全局共享的条件变量
void* MainThread(void *arg){
struct sockaddr_in cliAddr;
int connFd = -1;
pthread_detach(pthread_self()); //线程脱离
ChildThread *pchild = (ChildThread*) arg;
int index = pchild->num;
printf("childThread %d starting\n",index);
while(1){
int cliLen = sizeof(cliAddr);
//以下部分为子进程互斥从已建立连接的缓冲区中获取connFd
pthread_mutex_lock(&cliFdMutex);
//等待有连接
while(iget == iput){
pthread_cond_wait(&cliFdCond,&cliFdMutex);
}
connFd=cliFd[iget];
++iget;
if(iget == MAXCLI){
iget = 0;
}
pthread_mutex_unlock(&cliFdMutex);
pchild->thread_count++;
str_echo(connFd); //响应客户端请求
close(connFd); //关闭连接
}
}
//创建子线程池
void MakeChildThread(int i,ChildThread * childThread){
childThread[i].num = i;
int ret = pthread_create(&childThread[i].thread_tid,NULL,&MainThread,(void*) (&(childThread[i]))); //创建线程
if(ret != 0){
printf("Create pthread error!\n");
exit(0);
}
}
代码分析: 由于各个线程之间可以共享数据,包括建立的套接字连接,所以这里不需要向prefork那般 主进程把建立的套接字传递给各个子进程。
相反的,这里只需要通过建立一个互斥的消息队列系统, 主线程把建立的连接connFd加入到消息队列中,然后唤醒等待的子线程。而子线程只需要阻塞等待,直到通知来临告诉其有新的连接到来(任务到来),它才互斥的从消息队列中取出连接,然后开始任务处理。
上述代码中的消息队列是个简易版的,互斥手段采用的是条件变量(也可以根据需求采用信号量来实现)。
具体的代码也不难,这里就不详述了。
二、总结
同样的这里先讨论几个小问题,然后在总结一波小问题。
2.1 几个小问题
2.1.1 条件变量和信号量的区别。
条件变量和信号量都是在多线程中实现同步的机制。
不同的是,条件变量的判断逻辑是一个bool值(即条件是否成立),如果成立,则要么唤醒一个线程(signal),要么唤醒所有线程(broadcast) 。
而信号量的内部是一个资源变量(可用资源的个数N),可以同时有指定K个线程获取到资源,然后运行。
看起来好像说,信号量的功能比条件变量更为强大一些(当然,明显的条件变量可以唤醒所有等待着的线程,这是信号量不具备的),但按照陈硕大佬【1】中所建议的,尽量不使用信号量,互斥锁和条件变量应该能完成我们所遇到的大部分需求。 这里【1】中讲了两个原因:
- semaphore has no notion of ownership
- 信号量有自己的计数值(资源计数值),而通常我们自己的数据结构中也具有长度,这可能就造成了同样的信息存储了两份,需要时刻保持一致,增加了程序员的负担。
总的来说,这两点,我理解的还不是很深。有小伙伴了解的可以和我讨论讨论。 (* ^▽ ^ *)
2.1.2 条件变量的使用为什么要带着一个互斥锁?
按照大多数官方的说法, 条件变量的使用要带着一个互斥锁其主要的原因是 为了保证调用cond.wait()的线程A原子性的进入阻塞队列(阻塞在条件变量不成立的队列),进而防止另一个线程B“见缝插针”在中间的某个时刻(A线程正准备进入但是还未进入阻塞队列的时刻),发出signal信号,造成signal信号丢失的现象。
在我理解,不同线程修改相同资源都需要进行加锁(或者说是其他类型的加锁,如关闭中断),条件变量底层是一个bool值,信号量底层是一个整型的资源计数类型(在其内部实现中应该会有加锁、解锁等相关的处理),如果要在不同线程中使用,本质上都需要锁来控制访问来着。
2.1.3 阻塞在read、write等阻塞函数之上和阻塞在互斥锁和信号量等同步工具上有什么区别, 换句话说,能不能利用阻塞的IO操作来达到条件变量的同步效果?
这里,我搜了百度和谷歌,都没找到满意的答案。
按照我自己的理解回答一波。 要知道,一般来说,linux中的每个文件的read、write都是原子性的,也就是说file或者其类似的结构体中都包含了锁,所以我的推断是可以使用linux阻塞的IO操作来达到一定的同步效果。 就像UNP在“在不同进程之中传递描述符”中显示的那样-子进程阻塞在Read_Fd上等待主进程分配任务,如下图所示。
不过暗自推测使用阻塞IO来达到某种同步的目的,enen…, 效率不是很高。
2.2 pre threaded小总结
就我目前的专业水平来说,prethreaded模型在实际中用的要比prefork要多,因为相对来说thead更加的轻量、效率更高。 即使现在比较流行的reactor模型,也是用的thread而不是 process。
关于缺点,目前感觉和prefork 差不多,它们本质上都是阻塞的IO操作,如果一个线程内有多种的IO请求,就不得不阻塞等待了。
参考
【1】、 Linux 多线程服务端编程
【2】、注意信号量与条件变量的区别
【3】、用条件变量实现事件等待器的正确与错误做法