什么是生产者和消费者模型
什么是生产者消费者模型?生产者和消费是操作系统中一种重要的模型,它描述的是一种等待和通知的机制,如下图。
生产者和消费者模型必须具有的条件
用一句话概括,生产者消费者模型必须具有的条件是三种关系,两类角色,一类交易场所。
一类交易场所:交易场所指的是生产者和消费者之间进行数据交换的仓库,这块仓库相当于一个缓冲区,生产者负责把数据放入到缓冲区中,消费者负责把缓冲区中的数据取出来;
两类角色:指的是生产者和消费者;
三种关系:三种关系分别指的是:消费者和消费者,生产者和生产者,生产者和消费者;其中消费者和消费者,生产者和生产者之间都属于竞争关系,生产者和消费者之间的关系相当于是一种食物链之间的依赖关系。
生产者和消费者模型的特点
- 首先,生产者只需要关心“仓库”,并不需要关心具体的消费者。
- 对于消费者而言,它不需要关心具体的生产者,它只需要关心这个“仓库”中还有没有东西存在。
- 生产者生产的时候消费者不能进行“消费”,消费者消费的时候生产者不能生产,相当于一种互斥关系,即生产者和消费者一次只能有一人能访问到“仓库”。
- “仓库”为空时不能进行消费。
- “仓库”满时不能进行生产。
什么是条件变量
条件变量是线程可用的一种同步机制,它给多个线程提供了一个会合的场所,与互斥量一起使用时,允许线程以无竞争的方式等待特定条件发生。
条件本身是由互斥量保护的,线程在该变条件状态之前必须首先锁住互斥量,使临界区域只能被当前访问资源的线程所独有,其他线程在访问临界区域获得互斥量之前不会察觉到这种改变,因为互斥量必须在锁定以后才能计算条件。
条件变量的类型:pthread_cond_t
条件变量的初始化:条件变量的初始化有两种方法,1.可以使用PTHREAD_COND_INITIALIZER宏来进行初始化。2.可以使用pthread_cond_init函数来进行初始化。
条件变量的初始化函数和摧毁函数
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
//两个函数都是成功返回0,失败则返回错误码
条件变量的操作函数
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
//两个函数都是成功返回0,失败则返回错误码
pthread_cond_wait
该函数用来使用传递给它的互斥量来对条件进行保护,它主要做下面的事情:
1. 把调用线程放到等待条件的线程列表上;
2. 对互斥量进行解锁;
3. 函数返回时,互斥量再次被锁住。
pthread_cond_timewait
相比于上面的wait函数,timewait函数只是做了超时检查,超时值abstime指定了我们愿意等待多长时间.这个时间是一个绝对数.
如果超时了,等待的条件还是没有出现,那么pthread_cond_timedwait将重新获得互斥量,并返回错误码ETIMEDOUT.
当着两个函数调用成功并返回时,线程需要重新计算条件,因为另一个线程有可能会改变这个条件变量。
唤醒函数
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
//return val:成功返回0,失败则返回错误编号
其中signal函数至少能唤醒一个在条件变量上等待的线程。而broadcast函数则能唤醒在条件变量上等待的所有线程.
模型具体实例
案例:我们做这样一件事,用线程来模拟生产者和消费者,链表来模拟他们之间进行数据交换的场所,我们让生产者缓慢的往“仓库”中存储数据,而消费者很快的读取仓库中的数据,在我们不使用条件变量的情况下我们来观察下结果:
当我们使用条件变量后结果如下:
很明显,使用了条件变量的程序能够做到生产者写一次数据,消费者读一次数据,不管消费者或生产者执行的有多慢或者多快(大家可以做下验证),而不使用条件变量的结果中,如果消费者执行过快,那么消费者会不断的访问这块空的资源,如果生产者执行过快,那么生产者会频繁的访问“满的仓库”,所以说本例子中,条件变量解决了消费者和生产者模型中的特点问题。而消费者和生产者之间的互斥是由互斥量所保证的。
实例代码
//单链表的函数文件
#include "myList.h"
Node_p AllocNode(int data)
{
Node_p NewNode=(Node_p)malloc(sizeof(Node));
if(NewNode==NULL)
{
perror("malloc..\n");
return ;
}
NewNode->data=data;
NewNode->next=NULL;
return NewNode;
}
int IsEmpty(Node_p list)
{
assert(list);
if(list->next!=NULL)
{
return 0;
}
else
{
return 1;
}
}
void ListInit(Node_pp head)
{
*head=AllocNode(0);
}
void PushHead(Node_p list,int data)
{
assert(list);
Node_p NewNode=AllocNode(data);
NewNode->next=list->next;
list->next=NewNode;
}
void DelNode(Node_p node)
{
assert(node);
free(node);
node=NULL;
}
void PopHead(Node_p list,int *data)
{
assert(data);
if(IsEmpty(list))
{
printf("the list empty..\n");
return;
}
Node_p dNode=list->next;
list->next=dNode->next;
*data=dNode->data;
DelNode(dNode);
}
void ShowList(Node_p list)
{
assert(list);
Node_p cur=list->next;
while(cur)
{
printf("%d ",cur->data);
cur=cur->next;
}
printf("\n");
}
void DestroyList(Node_p list)
{
assert(list);
int data=0;
while(list->next)
{
PopHead(list,&data);
}
free(list);
list=NULL;
printf("list is destroy...\n");
}
//单链表的头文件
#ifndef __LIST_H__
#define __LIST_H__
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
typedef struct Node{
int data;
struct Node* next;
}Node,*Node_p,**Node_pp;
Node_p AllocNode(int data);
void ListInit(Node_pp head);
int IsEmpty(Node_p list);
void PushHead(Node_p list,int data);
void DelNode(Node_p node);
void PopHead(Node_p list,int *data);
void ShowList(Node_p list);
void DestroyList(Node_p list);
#endif //__LIST_H__
//生产者和消费者问题源文件
/*************************************************************************
> File Name: product_consumer.c
> Author: LZH
> Mail: [email protected]
> Created Time: Sun 19 Feb 2017 12:46:44 AM PST
************************************************************************/
#include "myList.h"
#include <pthread.h>
pthread_mutex_t mylock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mycond=PTHREAD_COND_INITIALIZER;
//pthread_mutex_t mylock=PTHREAD_MUTEX_INITIALIZER;
void* pthread_Product(void* arg)
{
Node_p head=(Node_p)arg;
while(1)
{
usleep(123456);
pthread_mutex_lock(&mylock);
int data=rand()%1000;
PushHead(head,data);
printf("I am producter,%d\n",data);
pthread_cond_signal(&mycond);
//ShowList(arg);
pthread_mutex_unlock(&mylock);
}
}
void* pthread_Consumer(void* arg)
{
Node_p head=(Node_p)arg;
int data=0;
while(1)
{
pthread_mutex_lock(&mylock);
//sleep(1);
if(IsEmpty(head))
{
pthread_cond_wait(&mycond,&mylock);
}
PopHead(head,&data);
//ShowList(head);
//sleep(1);
printf("I am consumer,%d\n",data);
pthread_mutex_unlock(&mylock);
}
}
void test()
{
printf("product_consumer...\n");
Node_p head;
ListInit(&head);
printf("head->data:%d\n",head->data);
int i=0;
while(i<10)
{
PushHead(head,i);
i++;
ShowList(head);
}
int data;
while(i>0)
{
PopHead(head,&data);
i--;
ShowList(head);
printf("IsEmpty?%d\n",IsEmpty(head));
}
DestroyList(head);
ShowList(head);
//return 0;
}
int main()
{
Node_p head=NULL;
ListInit(&head);
pthread_t tid1,tid2;
int ret1=pthread_create(&tid1,NULL,pthread_Product,(void*)head);
int ret2=pthread_create(&tid2,NULL,pthread_Consumer,(void*)head);
printf("ret1:%d,ret2:%d\n",ret1,ret2);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_mutex_destroy(&mylock);
pthread_cond_destroy(&mycond);
return 0;
}