线程池
首先按照国际惯例先上概念,概念来自维基百科:
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。
- 半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。
- 领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。
我对线程池的理解
线程池就是有一堆线程的池子,在有任务的时候,线程池就会排队从任务队列中取任务并执行。那么为什么需要线程池呢?举个例子:
HTTP服务器中有很多连接请求,这些连接请求如果每次来,主线程都创建一个线程来执行任务,那么在这个过程中,线程不断被创建不断被销毁,而且一个HTTP请求往往会被分为多个部分请求,如果请求一多,创建销毁的过程尽管很快,但是当这个过程的数量级上来了,那么将会耗费大量资源,线程池的存在就是为了解决这个创建和销毁的过程中所消耗的大量资源问题。
也就是说如果有线程池,那么大量创建销毁线程的这个过程就可以省去,当并发极高的时候,线程池可以缓解一定压力。
线程池中线程数量的问题
之前面试的时候有问到一个问题就是线程池的数量一般有几个,当时没有答出来,维基百科给出了答案,线程数一般取 CPU数量+2 比较合适,但是光有式子没有说服力,不过查阅其他资料发现其并不全对。
一句两句讲不完整,主要说说我的理解,具体可以看魏小言的关于线程池数量讨论的这篇博客
一般一个程序可以分为CPU密集型(又叫计算密集型)和IO密集型,计算密集型就是CPU一直算的那种程序,而IO密集型则是需要经常读写的程序,这个时候线程池的数量不能一概而论:
- 计算密集型对CPU占用肯定是高于IO密集型的,计算密集型的程序在切换调度的时候需要保存更多的上下文数据
- 而IO密集型在IO操作的时候可以将CPU资源让出来调度其他线程
因此线程池中线程的数量不是固定的,有看到设置为2倍的,也有看到设置为8倍甚至更多的。
但可以肯定的是,这个数量是有上限的,如果线程池中线程数量过多,对CPU调度也是一种负担,CPU调度切换也需要时间,反而会引起性能下降的问题,还有一个问题就是线程太多,但是根本用不完,导致资源浪费严重。
生产者消费者模型
这篇文章主要是关于生产者和消费者模型的线程池,首先什么是生产者消费者模型:主要是一个场所,两个身份,三种关系。
- 一个场所——临界资源
- 两个身份——生产者和消费者
- 三种关系——生产者和生成者(互斥)、生产者和消费者(同步与互斥)、消费者和消费者(互斥)
线程池简单实现
以前没有学习线程池时,一般是通过生产者直接通知消费者资源已经足够,让消费者去获取的,现在线程池做的工作则是对这个过程进行解耦。
解耦的方式为,生产者不直接给消费者通知,而是将资源放入一个队列,而消费者也不直接问生产者要资源,只问队列要资源,解耦后的生产者消费者结构不像之前那么紧密结合,而是各自管各自的工作,中间介入了一个队列充当中介的身份收取和提供临界资源。
线程池中目前实现两个类,一个是Task类,也就是任务,任务很简单,就是打印一句话,另一个则是线程池的类ThreadPool类,负责线程的统一管理,由于实现的非常简单,需要注意的就是同步与互斥问题以及任务分配需要注意的问题。
直接上代码,代码中看的更清楚
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
typedef void (*HandleFunction)(void*);
// 等会要做的任务
void print(void* arg) {
std::cout << *(int*)arg << std::endl;
}
class Task {
public:
Task(HandleFunction ff){f = ff;}
// 重载()实现仿函数
void operator()() {
int i = 100;
f((void*)&i);
}
private:
HandleFunction f;
};
class ThreadPool {
public:
ThreadPool(int num = 5) {
thread_nums = num;
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&cond, NULL);
}
~ThreadPool() {
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
// 线程池初始化创建5个线程
void InitThreadPool() {
pthread_t tid;
for (size_t i = 0; i < thread_nums; i++) {
pthread_create(&tid, NULL, ThreadRotinue, this);
}
}
// 线程初始化完毕之后都要分离自己,并等待任务
static void *ThreadRotinue(void* arg) {
pthread_detach(pthread_self());
ThreadPool* tp = (ThreadPool*)arg;
for (;;) {
tp->LockQueue();
// 为什么不用if而用wihle
// 当队列不为空说明条件满足,但是有可能出现任务被别人取走的情况,这个时候就会出现
// 条件满足但是出来没任务,导致程序出错
// 因此当条件满足有任务时,出来之后还要再次判断,如果任务被他人取走,要重新等待
while (tp->QueueIsEmpty()) {
tp->ThreadIdle();
}
Task t = tp->GetTask();
tp->TaskPop();
tp->UnlockQueue();
t();
}
}
// 任务排队
void TaskPush(Task& t) {
LockQueue();
task_queue.push(t);
WakeUpThread();
UnlockQueue();
}
void LockQueue() {
pthread_mutex_lock(&lock);
}
void UnlockQueue() {
pthread_mutex_unlock(&lock);
}
void ThreadIdle() {
idle_nums++;
pthread_cond_wait(&cond, &lock);
idle_nums--;
}
void WakeUpThread() {
pthread_cond_signal(&cond);
}
bool QueueIsEmpty() {
return task_queue.empty();
}
Task GetTask() {
return task_queue.front();
}
void TaskPop() {
task_queue.pop();
}
private:
int thread_nums;
int idle_nums;
std::queue<Task> task_queue;
pthread_mutex_t lock;
pthread_cond_t cond;
};
int main() {
Task t(print);
ThreadPool tp(5);
tp.InitThreadPool();
while (1){
tp.TaskPush(t);
sleep(1);
}
return 0;
}
实现的比较简单,如果有错误或不足,还请及时指正。