在“进程间通信----信号量”一文中,有简单介绍过生产者消费者模型的基本概念。在下文中将使用有关线程的互斥与同步的相关概念来实现两种不同类型的生产者消费者模型。在本文中侧重于线程间同步的实现。有关线程互斥与同步的相关概念见线程的互斥与同步一文:
首先介绍该模型的背景知识:
生产者消费者问题
该问题是一个著名的同步问题。它描述的是:一群生产者进程正在生产产品,并将这些产品提供给消费者进程去消费。为使生产者和消费者能够并发执行。在两者之间设置了一个公共区域,生产者进入公共区域生产产品并放入其中。消费者进入公共区域并取走产品进行消费。
当一个生产者进入公共区域生产产品时,其他生产者和消费者不能同时进入公共区域生产产品或消费产品。当一个消费者进入公共区域消费产品的时候,其它消费者和生产者不能同时进入该区域消费产品或生产产品。也就是说,任意时刻,最多只允许一个生产者或一个消费者进入公共区域。即生产者和消费者必须互斥的访问公共区域。
当产品放满公共区域时,生产者必须等待,使消费者先消费。当公共区域为空时,消费者必须等待,使生产者先生产。即在公共区域为空或为满时,生产者或消费者在执行时要满足一定的先后顺序。即生产者与消费者对公共区域的访问必须同步。
根据以上描述,可以得出如下结论:
(1)生产者与生产者之间存在竞争即互斥关系
(2)消费者与消费者之间存在竞争即互斥关系
(3)生产者与消费者之间存在互斥与同步关系
以上三个结论可以概括为:三种关系,两种角色,一个临界区。即“三二一”规则。
下面根据具体研究不同类型的生产者消费者模型:
利用链表做临界资源的生产者消费者模型:
在该模型中临界资源为一带头节点的单向链表。多个生产者在向链表中头插节点,多个消费者在对链表进行头删结点。
在一个生产者头插结点或一个消费者在头删结点的过程中,其他生产者或消费者不能对该链表进行操作。因此:生产者与生产者之间,消费者与消费者之间,生产者与消费者之间存在互斥关系。在这里,通过互斥量来保证互斥关系。
因为链表是动态插入结点,所以可以说链表没有满的时候(不考虑内存满的情况)。所以这里只需考虑链表为空的时候。当链表为空时,生产者必须先执行头插结点。此时,便需要生产者与消费者在执行时满足一定的顺序要求。即生产者与消费者必须同步的访问链表。
通过以上分析,生产者与消费者之间满足上述的三个结论。
1. 利用条件变量保证线程间的同步
在该模型中,利用多个线程来模拟生产者与消费者,互斥量来保证生产者消费者任意两者之间的互斥关系,条件变量来保证生产者与消费者之间的同步关系。当链表为空时,使用条件变量使消费者线程挂起等待。有关互斥量与条件变量的使用见本文开头的博客链接。
下面模拟该模型:
定义互斥量和条件变量:
pthread_mutex_t mutex; pthread_cond_t cond;
消费者消费产品:
//消费者进行头删节点
void* consume(void* arg)
{
while(1)//消费者一直进行头删结点
{
pthread_mutex_lock(&mutex);//在头删结点时,先进入临界区获得临界资源,此时其他生产者或消费者不能在进入临界区,所以用互斥量
while(head->next == NULL)
//若while为if,可能会被异常唤醒。唤醒后,可能还是没有数据,就会对空链表进行操作,从而非法访问内存,所以即使被异常唤醒还要判断是否为空
{
printf("空链表\n");
pthread_cond_wait(&cond,&mutex);//当链表为空的时候,消费者必须挂起等待使生产者先运行,此时用条件变量来实现二者之间的同步
}
printf("%d consume %d\n",(int)arg,head->next->data);//链表不为空时,消费者头删结点
LinkListPopFront();
pthread_mutex_unlock(&mutex);//当该消费者消费结束时,退出临界区,解锁互斥量
usleep(5000000);//一个消费者0.5s消费一个
}
}
生产者生产产品:
//生产者进行头插节点
void* product(void* arg)
{
while(1)//生产者一直头插结点
{
//生产者在对链表进行操作时,必须先获取维护临界资源的互斥量。申请成功之后,才能对链表进行操作,同时其他人不能进入该临界区
pthread_mutex_lock(&mutex);
LinkListPushFront();//生产者头插结点
printf("%d product %d\n",(int)arg,head->next->data);
pthread_cond_signal(&cond);//在插入节点后唤醒等待挂起中的消费者,如果没有挂起的消费者,则该语句不执行任何操作
pthread_mutex_unlock(&mutex);//解锁互斥量,退出临界区
usleep(1000000);//一个生产者1s中生产一个
}
}
主函数创建生产者消费者线程:
int main() { LinkListInit(); pthread_mutex_init(&mutex,NULL);//初始化互斥量 pthread_cond_init(&cond,NULL);//初始化条件变量 srand((unsigned int)time(NULL)); pthread_t tid[8]; int i = 0; for(;i < 3;++i) { pthread_create(&tid[i],NULL,product,(void*)i);//创建3个生产者线程 } for(i = 3;i < 8;++i) { pthread_create(&tid[i],NULL,consume,(void*)i);//创建5个生产者线程 } for(i = 0;i < 8;i++) { pthread_join(tid[i],NULL);//线程等待 } pthread_mutex_destroy(&mutex);//销毁互斥量 pthread_cond_destroy(&cond);//销毁条件变量 return 0; }
上述有关头插结点和头删结点的操作见“单链表的基本操作”一文。
当去掉上述两个代码段中的红色字体标注的条件变量的相关操作时,出现如下结果:
空链表 空链表 空链表 空链表 空链表^C
出现上述结果的原因是,在上述代码中,消费者的速度比生产者快。当消费者将结点都删除完时,由于消费者比生产者优先级高,每次竞争互斥量时,都是消费者竞争到,但消费者此时什么也做不了。即占着锁而做着无用的事情,但消费者想做有用的事情却申请不到互斥量,从而造成了消费者的死锁问题,使二者之间不能高效运行。
加上上述两个代码段中的红色字体标注的条件变量的相关操作时,出现如下结果:
空链表 空链表 空链表 2 product 32 1 product 35 5 consume 35 3 consume 32 0 product 100 7 consume 100 空链表 空链表 空链表 ^C
上述结果可以看到,当链表为空时,消费者并没有一直占着锁而做无谓的等待,而是使生产者先生产完产品之后,在去消费。这样二者之间便能高效的合作。
因为,条件变量变保证了线程之间的同步问题。
2. 利用信号量保证线程间的同步
对于上述的链表做临界资源的生产者消费者模型,还可以使用信号量来保证线程间的同步问题。在本例中,可以设置一个信号量size来表示链表中结点的个数,初始设置为0。当生产者头插一个结点,size加1,消费者头删一个结点,size减1。
当信号量size的值为0时,消费者挂起等待。确保生产者先运行。这样,便可以保证生产者与消费者之间的同步问题。
有关信号量保证线程间同步的具体实现方法参考见下一个模型。
利用循环队列做临界资源的生产者消费者模型
在该模型中临界资源为一循环队列。多个生产者从队尾向队列中插入节点,多个消费者从队首删除队列中的结点。
在一个生产者插入结点或一个消费者在删除结点的过程中,其他生产者或消费者不能对该队列进行操作。因此:生产者与生产者之间,消费者与消费者之间,生产者与消费者之间存在互斥关系。在这里,通过互斥量来保证互斥关系。
该循环队列是由数组模拟实现的。所以队列的长度是有限的。当队列为空时,生产者必须先执行向队列中插入节点。当队列为满时,消费者必须先执行删除节点。在这两种情况下,便需要生产者与消费者在执行时满足一定的顺序要求。即生产者与消费者必须同步的访问链表。
1. 利用信号量来保证线程间的同步
与上例类似,用多个线程来分别模拟生产者和消费者。互斥量来保证任意二者之间的互斥关系。利用信号量来保证生产者与消费者之间的同步关系。
设置两个信号量full和empty。full表示队列中结点的个数。empty表示队列中没有节点的数组元素个数。比如说,一个长度为10的数组,现在往数组中插入个3个元素,此时,full为3,empty即为7。
当信号量full的值为0即此时队列为空时,生产者线程必须先运行。当empty的值为0即此时队列为满时,消费者必须先运行。
有关信号量的使用接口见线程的同步与互斥一文。
下面利用信号量来演示该模型(这里假设队列中的数组最大长度为20):
设置互斥量和信号量:
pthread_mutex_t mutex; sem_t full; sem_t empty;
生产者向队列中插入节点:
//生产者生产产品 void* product(void* arg) { while(1) { sem_wait(&empty); //首先:empty减1,即empty维护的临界资源个数减1 //然后:如果empty值小于0,则将该线程挂起等待,使消费者先运行。否则什么也不做 pthread_mutex_lock(&mutex);//生产者插入节点时要互斥的访问临界资源 QueuePush();//插入节点 int pre = 0; if(queue.tail == 0) { pre = 19; } else { pre = queue.tail - 1; } printf("%d product %d\n",(int)arg,queue.data[pre]); pthread_mutex_unlock(&mutex);//对临界资源操作完毕后,解锁互斥量,退出临界区 sem_post(&full); //full加1,即full维护的临界资源个数加1,即元素个数加1 //如果有消费者在挂起等待,则唤醒他。如果没有,则什么也不做 usleep(1); } }
消费者消费产品:
//消费者消费产品 void* consume(void* arg) { while(1) { sem_wait(&full); //首先:full减1,即full维护的临界资源个数减1 //然后:如果full值小于1,则将该线程挂起等待,使生产者先运行。否则什么也不做 pthread_mutex_lock(&mutex);//消费者删除节点时要互斥的访问临界资源 printf("%d consume %d\n",(int)arg,queue.data[queue.head]); QueuePop();//删除节点 pthread_mutex_unlock(&mutex);//对临界资源操作结束,解锁互斥量,退出临界区 sem_post(&empty); //首先:empty加1,即empty维护的临界资源个数加1000000 //然后:如果有生产者在挂起等待,则唤醒他。如果没有,则什么也不做 usleep(1000000); } }
注意:在上述两段代码中申请互斥量和申请信号量的顺序不能交换。如果先申请互斥量,进入临界区后发现没有临界资源就会在临界区中挂起等待造成死锁问题。
主线程中创建生产者消费者线程:
int main() { srand((unsigned int)time(NULL)); pthread_mutex_init(&mutex,NULL); //pthread_cond_init(&cond,NULL); sem_init(&full,0,0); sem_init(&empty,0,20); pthread_t tid[8]; int i = 0; for(;i < 3;++i) { pthread_create(&tid[i],NULL,product,(void*)i); } for(i = 3;i < 8;i++) { pthread_create(&tid[i],NULL,consume,(void*)i); } for(i = 0;i < 8;i++) { pthread_join(tid[i],NULL); } sem_destroy(&full); sem_destroy(&empty); return 0; }
如果去掉上述两个代码段中有关信号量的操作函数:
(1)如果生产者的个数比消费者多或生产者的速度大于消费者的速度,此时生产者的优先级比消费者优先级高,会出现以下结果:
1 product 3 队列已满 3 product 3 队列已满 4 product 3 队列已满 2 product 3 队列已满 0 product 3 ^C
生产者优先级比消费者高,所以生产者一直申请锁,但队列已满,无法向队列中插入结点,而消费者有无法获得锁去消费,所以就造成了生产者的死锁问题。
(2)如果消费者的个数比生产者多或消费者的速度比生产者快。会出现以下结果:
5 consume 0 队列已空 3 consume 0 队列已空 4 consume 0 队列已空 7 consume 0 队列已空 ^C
消费者优先级比生产者高所以,消费者一直申请锁,但队列已空,无法向从队列中删除结点。而生产者又无法获得锁去生产,所以就造成了消费者的死锁问题。
所以,要加上上述有关信号量的操作来保证线程同步:
(1)当队列为满时,不会造成生产者的死锁问题:
(2)当队列为空时,不会造成消费者的死锁问题:
3 product 20 4 product 60 5 product 43 6 product 74 7 consume 20 2 product 27 1 product 5 7 consume 60 0 product 39 7 consume 43 4 product 91 ^C
2. 利用条件变量来实现同步
在该模型中还可以通过条件变量来实现线程间的同步。
当队列为空时,设置一个条件变量,使消费者在该条件下挂起等待,生产者先运行,然后唤醒消费者。当队列为满时,再设置一个条件变量,使生产者在该条件下挂起等待,消费者先运行,然后唤醒生产者。
具体实现的方法参考上个模型。