生产消费模型是多线程诞生的产物,这么说也可以表明和多线程编程密不可分。
它是一种服务器设计方式,可以同时让服务器处理多个客户端请求,并且很好管理服务器内部的资源。
说到这个, 我们就先说说多线程的缺点吧, 这也是促进生产、消费模型的诞生的原因。
1、多线程共享进程的数据段、代码段、等。 由于共享, 因此在对数据段中临界资源进行访问时就会存在二义性。
2、多线程是操作系统调度的最小单位。 它是会竞争cpu资源的。 也是会被调度、切换的。
那么我们再说说生产消费模型。
它可以说是是由 (队列 + 生产者线程 + 消费者线程)组成的。
为了解决到多线程存在的问题。
我们还需要使用到互斥锁和条件变量。
1、我们采用锁对队列资源进行保护,保证对队列中的临界资源的安全访问。
2、采用条件变量对生产者线程和消费者线程进行阻塞 ,只有被唤醒才会接着运行。
当一个线程被阻塞后, 它是不会竞争CPU资源的,和进程的三种状态之一的阻塞状态一样。
我们接下来说说生产、消费模型执行的逻辑:
1、生产者线程: 它负责将要处理的数据添加到线程安全队列当中去。如果队列满了则不能添加,进入阻塞状态, 如果队列没满, 则添加成功, 同时唤醒消费线程去消费。
2、消费者线程: 它负责从队列中取出要处理的数据。 如果队列中没有数据,则不能取, 进入阻塞状态, 如果队列中有数据, 则取成功,同时唤醒生产线程去生产。
3、线程安全队列: 对队列中添加要处理的数据或取出数据, 都必须进行加锁操作。
这样就仅仅有条,又能够很好的管理生产、消费线程, 需要时唤醒它们执行, 不需要时将他们阻塞。而且又能又能防止对线程带来的线程安全问题。
1)下面的模型的安全队列使用STL的queue实现的。STL中所有容器不支持线程安全!
代码:
1 #include<stdio.h>
2 #include<iostream>
3 #include<pthread.h>
4 #include<queue>
5 #include<unistd.h>
6
7 #define THREADNUM 4
8 int i = 0;
9 //生产、消费模型类
10 class blockqueue{
11
12 public:
13 blockqueue(int capacity = 4)
14 :Capacity_(capacity)
15 { //初始化锁、生产条件变量、消费条件变量
16 pthread_mutex_init(&Queue_Mux_, NULL);
17 pthread_cond_init(&Consume_cond, NULL);
18 pthread_cond_init(&Product_cond, NULL);
19 }
20
21 ~blockqueue(){ //释放锁、条件变量资源
22 pthread_mutex_destroy(&Queue_Mux_);
23 pthread_cond_destroy(&Consume_cond);
24 pthread_cond_destroy(&Product_cond);
25 }
26 //shanchu
27 int Pop(int& data){
28 //消费者线程
29
30 pthread_mutex_lock(&Queue_Mux_);
31
32 while(Queue_.empty()){
33 pthread_cond_wait(&Consume_cond, &Queue_Mux_);
34 }
35
36 data = Queue_.front();
37 Queue_.pop();
38 pthread_mutex_unlock(&Queue_Mux_);
39 pthread_cond_signal(&Product_cond);
40
41 return data;
42 }
43 //添加
44 int Push(int data){
45
46 pthread_mutex_lock(&Queue_Mux_);
47 //这块一定要采用while,不能用if
48 while(Queue_ISFULL()){
49 pthread_cond_wait(&Product_cond, &Queue_Mux_);
50 }
51
52 Queue_.push(data);
53 pthread_mutex_unlock(&Queue_Mux_);
54 pthread_cond_signal(&Consume_cond);
55 return data;
56 }
57 //判断队列是否为空
58 bool Queue_ISFULL(){
59 if(Queue_.size() == Capacity_){
60 return true;
61 }
62 return false;
63 }
64 private:
65 std::queue<int> Queue_;
66 size_t Capacity_;
67 pthread_mutex_t Queue_Mux_;
68 pthread_cond_t Consume_cond;
69 pthread_cond_t Product_cond;
70 };
以上就是一个简单的管理生产、消费模型类, 它里面主要有2个对外接口,分别是push、pop接口, 一个对应生产线程使用、一个对应消费线程使用。
使用流程:
1、生产者线程得到要处理的数据(客户端的请求)就调用push接口,向队列中添加一个要处理的信息, 同时唤醒消费线程去从队列当中拿数据去执行处理函数返回给用户。
2、消费线程拿走要处理数据调用pop接口, 取走队列当中的数据, 同时唤醒生产线程往队列当中添加数据。
从而形成一个良性的循环。
test接口
//消费线程入口函数
74 void* Consume_pthread_start(void* arg){
75
76 blockqueue* bq = (blockqueue*)arg;
77 int data = 0;
78 while(1){
79 printf("线程[%p], 消费了资源[%d]\n", pthread_self(), bq->Pop(data));
80 sleep(1);
81 }
82 return NULL;
83 }
84
85 void* Product_pthread_start(void* arg){
86
87 blockqueue* bq = (blockqueue*)arg;
88 while(1){
89 printf("线程[%p], 生产了资源[%d]\n", pthread_self(), bq->Push(i));
90 ++i;
91 sleep(1);
92 }
93
94 return NULL;
95 }
96
97 int main(){
98
99 pthread_t Con_tid[THREADNUM];
100 pthread_t Pro_tid[THREADNUM];
101 blockqueue* bq = new blockqueue(10);
102
103 for(int i = 0; i < THREADNUM; ++i){
104 int ret = 0;
105 ret = pthread_create(&Con_tid[i], NULL, Consume_pthread_start, (void*)bq);
106 if(ret){
107 perror("Consume_pthread_create");
108 return 2;
109 }
110 ret = pthread_create(&Pro_tid[i], NULL, Product_pthread_start, (void*)bq);
111 if(ret){
112 perror("Product_pthread_create");
113 return 3;
114 }
115 }
116
117 for(int i = 0; i < THREADNUM; ++i){
118 pthread_join(Con_tid[i], NULL);
119 pthread_join(Pro_tid[i], NULL);
120 }
121
122
123 delete bq;
124 bq = NULL;
125 return 0;
126 }
使用两生产、两消费线程去测试生产、消费模型。
如下是测试结果图:
2)使用posix信号量实现的模型
概念:posix信号量是一个资源计数器+PCB等待队列+唤醒、阻塞接口
作用:主要是完成线程、进程的互斥、同步。
由此可见, 我们可以采用posix信号量里面的计数器作为线程安全队列替换锁。里面的阻塞、唤醒接口代替条件变量。
特别提醒: 当posix信号量调用sem_wait阻塞接口时里面的计数器-1。调用sem_post唤醒接口时计数器+1。(不懂这块可以先查阅posix使用流程和内部概念)
代码:
1 #include<iostream>
2 #include<pthread.h>
3 #include<semaphore.h>
4 #include<unistd.h>
5 #include<vector>
6 #include<cstdio>
7
8 #define THREAD_NUM 1
9 #define QUEUESIZE 1
10
11 class RingQueue{
12 public:
13 RingQueue(int capacity = 1)
14 :Capacity_(capacity)
15 ,pos_write_(0)
16 ,pos_read_(0)
17 {
18 Vec_.resize(Capacity_ + 1);
19 sem_init(&Con_sem_, 0, 0);
20 sem_init(&Pro_sem_, 0, Capacity_);
21 sem_init(&Common_sem_, 0, 1);
22 }
23
24 ~RingQueue(){
25 sem_destroy(&Con_sem_);
26 sem_destroy(&Pro_sem_);
27 sem_destroy(&Common_sem_);
28 }
29
30 void Pop(int* data){
31 //加锁
32 sem_wait(&Con_sem_);
33
34 sem_wait(&Common_sem_);
35
36 *data = Vec_[pos_read_];
37 pos_read_ = (pos_read_ + 1) % Capacity_;
38
39 sem_post(&Common_sem_);
40 sem_post(&Pro_sem_);
41 sleep(1);
42 }
43
44 void Push(int& data){
45
46 sem_wait(&Pro_sem_);
47
48 sem_wait(&Common_sem_);
49 Vec_[pos_write_] = data;
50
51 pos_write_ = (pos_write_ + 1) % Capacity_;
52 sem_post(&Common_sem_);
53
54 sem_post(&Con_sem_);
55 sleep(1);
56 }
57
58 private:
59 std::vector<int> Vec_;
60 size_t Capacity_;
61 int pos_write_;
62 int pos_read_;
63
64 //定义三个消息量, 前两个是为了存放等待队列, 后一个公用的计数器
65 sem_t Con_sem_;
66 sem_t Pro_sem_;
67 sem_t Common_sem_;
68 };
test代码
70 void* Consum_pthread_start(void* arg){
71
72 RingQueue* bq = (RingQueue*)arg;
73 int data = 0;
74 while(1){
75
76 bq->Pop(&data);
77 printf("Consume_thread[%p], 消费了资源[%d]\n", (int*)pthread_self(), data);
78
79 }
80
81 return NULL;
82 }
83
84 void* Product_pthread_start(void* arg){
85 RingQueue* bq = (RingQueue*)arg;
86 int i = 10;
87 while(1){
88 bq->Push(i);
89 printf("Product_thread[%p], 生产了资源[%d]\n", (int*)pthread_self(), i);
90 ++i;
91 }
92
93 return NULL;
94 }
95
96 int main(){
97
98 pthread_t Con_tid[THREAD_NUM], Pro_tid[THREAD_NUM];
99
100 //RingQueue* rq = new RingQueue(4);
101 RingQueue rq(QUEUESIZE);
102
103 for(int i = 0; i < THREAD_NUM; ++i){
104 int ret = pthread_create(&Con_tid[i], NULL, Consum_pthread_start, (void*)&rq);
105 if(ret){
106 perror("Consume_pthread_create()\n");
107 return 2;
108 }
109 ret = pthread_create(&Pro_tid[i], NULL, Product_pthread_start, (void*)&rq);
110 if(ret){
111 perror("Product_pthread_create()\n");
112 return 3;
113 }
114 }
115
116 for(int i = 0; i < THREAD_NUM; ++i){
117 pthread_join(Con_tid[i], NULL);
118 pthread_join(Pro_tid[i], NULL);
119 }
120
94 }
95
96 int main(){
97
98 pthread_t Con_tid[THREAD_NUM], Pro_tid[THREAD_NUM];
99
100 //RingQueue* rq = new RingQueue(4);
101 RingQueue rq(QUEUESIZE);
102
103 for(int i = 0; i < THREAD_NUM; ++i){
104 int ret = pthread_create(&Con_tid[i], NULL, Consum_pthread_start, (void*)&rq);
105 if(ret){
106 perror("Consume_pthread_create()\n");
107 return 2;
108 }
109 ret = pthread_create(&Pro_tid[i], NULL, Product_pthread_start, (void*)&rq);
110 if(ret){
111 perror("Product_pthread_create()\n");
112 return 3;
113 }
114 }
115
116 for(int i = 0; i < THREAD_NUM; ++i){
117 pthread_join(Con_tid[i], NULL);
118 pthread_join(Pro_tid[i], NULL);
119 }
120
94 }
95
96 int main(){
97
98 pthread_t Con_tid[THREAD_NUM], Pro_tid[THREAD_NUM];
99
100 //RingQueue* rq = new RingQueue(4);
101 RingQueue rq(QUEUESIZE);
102
103 for(int i = 0; i < THREAD_NUM; ++i){
104 int ret = pthread_create(&Con_tid[i], NULL, Consum_pthread_start, (void*)&rq);
105 if(ret){
106 perror("Consume_pthread_create()\n");
107 return 2;
108 }
109 ret = pthread_create(&Pro_tid[i], NULL, Product_pthread_start, (void*)&rq);
110 if(ret){
111 perror("Product_pthread_create()\n");
112 return 3;
113 }
114 }
115
116 for(int i = 0; i < THREAD_NUM; ++i){
117 pthread_join(Con_tid[i], NULL);
118 pthread_join(Pro_tid[i], NULL);
119 }
120 return 0;
121 }
测试图:
最后我谈下我对生产、消费模型的认知, 毕竟初学,认识薄浅,请见谅!
作为服务器后台开发, 采用生产、消费模型作为框架,个人感觉还不错, 生产线程接受到用户的请求, 最终消费线程取出用户的请求并执行处理接口返回给用户。既能保障服务器内部的资源合理管制, 又能达到并发处理业务的功能。