生产者与消费者模型
本文主要是继文章线程的同步与互斥,继续研究的线程同步问题,本文用到的互斥量、条件变量、信号量等知识在该博客中都有介绍。
一. 背景知识
生产者与消费者模型是一个著名的同步问题,它是基于等待/通知机制实现的。它描述的是:有一块缓冲区作为公共区域,生产者生产完产品放入该区域,消费者消费是从区域中拿走产品。它比较注意的是要实现以下几点:
(1)生产者生产时,消费者不能消费
(2)消费者消费时,生产者不能生产
(3)缓冲区空时,消费者不能消费
(4)缓冲区满时,生产者不能消费
其中,1和2说明生产者和消费者必须互斥的访问缓冲区;3和4则说明即生产者与消费者对缓冲区的访问必须同步。
所以,通过以上我们可以总结出生产者与消费者模型中的三种关系:
(1)生产者与生产者之间存在互斥关系
(2)消费者与消费者之间存在互斥关系
(3)生产者与消费者之间存在互斥与同步关系
可以概括为:三种关系,二种角色,一种场所,“三二一”规则。
二. 基于链表实现生产者与消费者模型
首先,该例中临界资源为链表,每次生产和每次消费是以链表的一个结点为单位的。
我们要满足生产者与消费者模型的基本要求,首先要实现互斥,这里我们利用互斥量实现互斥,具体互斥量的使用方法见点击打开链接一文,关于同步关系,我们利用条件变量实现,具体使用方法仍见点击打开链接一文。这里要注意的是我们利用基于链表实现,链表是动态存储的,容量可变,所以这里不考虑链表满时的情况,主要考虑链表为空的情况。
在该模型中,利用多个线程来模拟生产者与消费者,互斥量来保证生产者消费者任意两者之间的互斥关系,条件变量来保证生产者与消费者之间的同步关系。当链表为空时,使用条件变量使消费者线程挂起等待。具体代码如下:
1. 链表结构及互斥量与条件变量的申请
//基于条件变量实现生产者与消费者模型 //基于链表实现,空间可以动态分配 #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> typedef struct msg//链表一个结点的结构 { int num; struct msg* next; }msg; msg* head = NULL;//用一个空指针表示整个链表 pthread_mutex_t mutex;//互斥锁 pthread_cond_t cond;//条件变量 pthread_t threads[8];
2. 消费者消费
void* consumer(void* p)//消费者消费->删除一个结点 { int num = (int)p; msg* mp; while(1) { pthread_mutex_lock(&mutex);//加锁 //链表为空时,进行等待 while(head == NULL) { printf("%d begin wait consume\n", num); pthread_cond_wait(&cond, &mutex);//等待 } //链表不为空时,进行消费 printf("%d end of wait\n", num); printf("%d begin cousume\n", num); //进行消费动作,删除一个结点 mp = head; head = head->next; pthread_mutex_unlock(&mutex);//解锁 //打印并释放结点 printf("Consume %d\n", mp->num); free(mp); printf("%d end consume\n", num); printf("\n"); sleep(1); } }
3. 生产者进行生产
void* producer(void* p)//生产者生产->增加一个结点 { msg* mp; int num = (int)p; while(1) { //要先生产,才可以消费 printf("%d begin product\n", num); mp = (msg*)malloc(sizeof(msg)); mp->num = rand()%100+1; printf("product is %d\n", mp->num); pthread_mutex_lock(&mutex); //将生产好的结点连到链表上 mp->next = head; head = mp; printf("%d end of product\n", num); //唤醒消费者 pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); printf("\n"); sleep(5); } }
4. 主函数——创建实现生产者与消费者的线程
int main() { srand(time(NULL)); pthread_cond_init(&cond, NULL); pthread_mutex_init(&mutex, NULL); int i; for(i=0; i<5; i++)//消费者 { pthread_create(&threads[i], NULL, consumer, (void*)i); } for(i=5; i<8; i++)//生产者 { pthread_create(&threads[i], NULL, producer, (void*)i); } for(i=0; i<8; i++) { pthread_join(threads[i], NULL); } pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); return 0;
运行结果为:
三. 基于定长的环形队列实现生产者与消费者模型
这里我们基于队列实现消费者与生产者模型,即队列中的元素个数为临界资源。生产者生产就是向队列中加一个元素,消费者消费就是删除队列中的一个元素。并且,这里仍然使用互斥量保证线程间的互斥,但是用信号量保证线程间的同步关系。
其中,我们会设置两个信号量sem_full和sum_empty。sem_full表示队列中结点的个数,sem_empty表示队列中空位置的元素个数。比如说,一个长度为10的数组,往数组中插入个3个元素,此时,sem_full为3,sem_empty为7。
当信号量sem_full的值为0,即队列为空时,生产者线程必须先运行。当sem_empty的值为0,即此时队列为满时,消费者必须先运行。具体实现代码如下:
1. 互斥量及信号量的定义
这里的队列相关操作实现我们直接调用了以前写过的顺序队列代码,具体见博客队列
//基于POSIX信号量实现生产者与消费者模型 //基于定长的循环队列实现 #include "seqqueue.c" #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <semaphore.h> #define MAXSIZE 20 int buf[MAXSIZE]; pthread_mutex_t mutex;//互斥锁 sem_t sem_empty; sem_t sem_full; SeqQueue q;
2. 消费者消费
void* consumer(void* p)//消费者消费产品 { int num = (int)p; int i; while(1) { //队列为空时等待 printf("%d wait buf no empty\n"); sem_wait(&sem_empty); //不为空时进行操作 pthread_mutex_lock(&mutex); //删除队首元素 printf("%d consume %d\n", num, q.data[q.head]); SeqQueuePop(&q); pthread_mutex_unlock(&mutex); sem_post(&sem_full); sleep(1); } }
3. 生产者生产
void* producer(void* p)//生产者生产产品 { int num = (int)p; int i; while(1) { printf("%d wait buf not full\n", num); sem_wait(&sem_full);//满时就等待 pthread_mutex_lock(&mutex); SeqQueuePush(&q, rand()%20); if(q.size == 0) i = -1;//队列为空时 else i = q.tail - 1;//让i指向刚刚插入的结点 printf("%d product %d\n", num, q.data[i]); pthread_mutex_unlock(&mutex); sem_post(&sem_empty);//资源使用完毕,释放资源 sleep(5); } }
4. 主函数——创建生产者与消费者线程
int main() { srand((unsigned int)time(NULL)); int i; pthread_t threads[8]; //初始化 pthread_mutex_init(&mutex, NULL); sem_init(&sem_full, 0, MAXSIZE); sem_init(&sem_empty, 0, 0); for(i=0; i<3; i++)//0-2生产 { pthread_create(&threads[i], NULL, producer, (void*)i); } for(i=3; i<8; i++)//3-7消费 { pthread_create(&threads[i], NULL, consumer, (void*)i); } for(i=0; i<8; i++) { pthread_join(threads[i], NULL); } pthread_mutex_destroy(&mutex); sem_destroy(&sem_empty); sem_destroy(&sem_full); return 0; }
运行结果为: