在对linux 下的进程和线程的学习后,总结一下生产者消费者模型
场景描述:
- 生产者负责在生产池中进行生产
- 消费者负责消费生产池中的生产物
- 同一时刻生产池中只能有一个生产者/消费者
- 生产池中没有生产物时,消费者阻塞,直到生产者进行生产之后(生产池不未空),系统再唤醒消费者进行消费
- 生产池为满时,生产者阻塞,直到消费者进行消费之后(生产池未满),系统再唤醒生产者进行生产
建立模型:
- 一个交易场所(生产池),这里用一个带头结点的单向链表描述
- 两种角色,这里用若干个线程描述生产者,用若干个线程描述消费者
- 三种关系
: 生产者与生产者之间为 互斥关系
:消费者与消费者之间为 互斥关系
:生产者与消费者之间为 同步与互斥关系
代码实现
<实现方法1>
注意这里编译时要加上线程库,-pthread 选项
采用互斥量和条件变量
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
//生产者消费者模型
//生产者和生产者之间为互斥关系
//消费者和消费者之间为互斥关系
//生产者和消费者之间为同步互斥关系
pthread_mutex_t mutex;//互斥量
pthread_mutex_t mutex_con;
pthread_cond_t p_cond;
int count=0;
//1.实现交易场所(带头结点,不带环的单向链表(栈结构))
typedef struct Node
{
struct Node *_next;
int data;
}Node;
void ListInit(Node *Head)
{
Head=(Node *) malloc(sizeof(Node));
Head->_next=NULL;
}
Node Head;
void ListPushFront(Node * Head,int val)
{
if(Head==NULL)
{
return;
}
Node * new_node=(Node *)malloc(sizeof(Node));
new_node->data=val;
new_node->_next=Head->_next;
Head->_next=new_node;
}
void ListPopFront(Node * Head,int * val)
{
if(Head==NULL)
{
return;
}
if(Head->_next==NULL)
{
return;
}
Node * to_delete=Head->_next;
*val=to_delete->data;
Head->_next=to_delete->_next;
free(to_delete);
}
void ListDestroy(Node *Head)
{
if(Head==NULL||Head->_next==NULL)
{
return;
}
Node * cur=Head->_next;
while(cur!=NULL)
{
Node * to_delete=cur;
cur=cur->_next;
free(to_delete);
}
}
//2.**********************实现三种关系*****************************
//*******生产者****************
void * Producer(void * arg){
(void) arg;
while(1)
{
while(1)
{
pthread_mutex_lock(&mutex);//在进行生产之前先获取锁(控制互斥关系)
ListPushFront(&Head,++count);
printf("生产:%d\n",count);
pthread_cond_signal(&p_cond);//在生产之后进行信号量加一操作,通知消费者可以进行消费了
pthread_mutex_unlock(&mutex);//在生产之后释放锁
usleep(187761);
}
}
return NULL;
}
//*******消费者****************
void * Consumer(void * arg){
(void) arg;
while(1)
{
pthread_mutex_lock(&mutex);//在进行消费之前先获取锁
int tmp=-1;
while(Head._next==NULL)//当没有节点时,进行循环等待
{
pthread_cond_wait(&p_cond,&mutex);//循环等待生产者进行生产,条件变量
}
ListPopFront(&Head,&tmp);
if(tmp==-1){
//说明没有进行消费
break;
}
printf("消费:%d\n",tmp);
pthread_mutex_unlock(&mutex);//消费完后释放锁
usleep(4551);
}
return NULL;
}
//2.*********************实现两种角色*****************************
void test()
{
const int N=6;
pthread_t thread[N];
int i=0;
ListInit(&Head);
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&p_cond,NULL);
//创建几个生产者
for(i=0;i<N/2;i++)
{
pthread_create(&thread[i],NULL,Producer,NULL);
}
//创建几个消费者
for(i=0;i<N-N/2;i++)
{
pthread_create(&thread[i],NULL,Consumer,NULL);
}
for(i=0;i<N;++i)
{
pthread_join(thread[i],NULL);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&p_cond);
}
int main()
{
test();
return 0;
}
<实现方法2>
采用POXI信号量实现
互斥关系用一个二元信号量实现
同步关系采用两种临界资源的操作来控制
一种为生产物资源,sem_s
一种为空为资源, sem_b
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#define MAX_SIZE 1024
//采用POSIX信号量实现生产者消费者模型
//生产者和生产者之间为互斥关系
//消费者和消费者之间为互斥关系
//生产者和消费这之间为同步互斥关系
//******************实现一个交易场所******************
//循环队列
typedef struct Queue
{
int data[MAX_SIZE];
size_t size;
int start;
int end;
}Queue;
Queue que;//一个全局的队列,作为交易场所
int count;//一个全局的变量,作为生产的物品
sem_t sem_b,sem_s,sem_c;
void QueueInit(Queue * que)
{
if(que!=NULL)
{
que->size=que->start=que->end=0;
}
}
void QueuePush(Queue * que,int value)
{
if(que!=NULL)
{
if(que->size>=MAX_SIZE)
{
return;
}
else
{
que->data[que->end]=value;
que->end++;
if(que->end>=MAX_SIZE)
{
que->end-=MAX_SIZE;
}
que->size++;
}
}
}
void QueuePop(Queue * que,int * tmp)
{
if(que!=NULL)
{
if(que->size==0)
{
return;
}
else
{
*tmp=que->data[que->start];
que->start++;
if(que->start>=MAX_SIZE)
{
que->start-=MAX_SIZE;
}
que->size--;
}
}
}
//实现三种关系
void * Product(void * arg)
{
(void)arg;
while(1)
{
sem_wait(&sem_b);//空位的个数减一,如果sem_b为0,则进行等待消费者消费(队列满)
sem_wait(&sem_c);//等待信号量,将信号量值减一(实现互斥锁)
QueuePush(&que,++count);
printf("生产:%d\n",count);
sem_post(&sem_c);//发布信号量,将信号量值加一(实现互斥锁)
sem_post(&sem_s);//元素的个数加一,通知消费者来消费,实现生产者和消费者之间的同步关系
sleep(1);
}
return NULL;
}
void * Consume(void * arg)
{
(void)arg;
while(1)
{
int tmp=-1;
sem_wait(&sem_s);//将元素个数减一,如果sem_s为0,则进行等待生产者生产(队列空)
sem_wait(&sem_c);//等待信号量,将信号量值减一(实现互斥锁)
QueuePop(&que,&tmp);
printf("消费:%d \n",tmp);
sem_post(&sem_c);//发布信号量,将信号量值加一(实现互斥锁)
sem_post(&sem_b);//将空白个数加一,实现生产者和消费者之间的同步关系
sleep(1);
}
return NULL;
}
//实现两种角色
void test()
{
const int N=5;
pthread_t thread[N];
//初始化信号量
sem_init(&sem_s,0,0);//元素的个数
sem_init(&sem_b,0,MAX_SIZE);//空位的个数
sem_init(&sem_c,0,1);//二元信号量,互斥锁
//初始化队列
QueueInit(&que);
int i=0;
//创建线程
for(i=0;i<N/2;i++)
{
pthread_create(&thread[i],NULL,Product,NULL);
}
for(i=0;i<N-N/2;++i)
{
pthread_create(&thread[i],NULL,Consume,NULL);
}
//等待线程
for(i=0;i<N;++i)
{
pthread_join(thread[i],NULL);
}
//销毁信号量
sem_destroy(&sem_b);
sem_destroy(&sem_s);
sem_destroy(&sem_c);
}
int main()
{
test();
return 0;
}