基于posix信号量的生产者于消费者模型
(此处只有一个生产者与一个消费者)
使用一个循环队列作为生产者于消费者之间的交易场所,生产者向其中放数据,消费者从中拿取数据。为了达到同步使用posix信号量。
posix 信号量
信号量就相当于一个计数器,记录共享资源的份数。每当有一个线程申请访问该共享资源,就将信号量的数目–,当信号量的数目为零时,表示所有的公共资源都正在被访问。其他想要继续申请信号量线程便进行等待当信号量数目不为0时,其他线程可继续申请。每当一个线程访问完信号量时,便将信号量的数目++,其他线程便可继续申请。通过posix信号量,当所有资源被被使用时,便不可再使用,直到有可用资源出现,实现了同步。
一个定长的数组,作为循环队列的基础
生产者与消费者两者之间存在互斥与同步的关系。即两者不能访问同一块资源,同时要在生产者放完数据后,消费者才能从该块区域中拿取数据。
通过设置信号量的初始值,使生产者与消费者不可能同时存取同一块数据块中的数据,只能是生产者先放好数据,消费者才能从中取出数据。使得消费者只能跟在生产者后面,虽然两者可以同时访问同一队列,但无法同时访问同一个下标的数据块。生产者将所有的空格放满数据后,只有消费者取走后才能继续放,所以生产者的的速度不能超过消费者一圈,两者之间的距离间隔只能是一圈。
- 消费者要跟在生产者的后面,消费者不能超过生产者
- 生产者的速度不能超过消费者一圈,在放完一圈后只能等带消费者拿取数据
- 开始时生产者与消费者同时指向0下标处
- 当将空格放满时,消费者未拿时,生产者与消费者又同时指向0下标处
- 使用两个信号量:空格数目的信号量 有数据的信号量
- 空格初始信号量为队列的容量,数据块的信号量初始值为0,当一个空格被放入数据时,空格信号量减一,数据块信号量加1,当一个数据被拿走时,空格信号量加一,数据块的信号量减一。
#ifndef __RINGQUEUE_HPP__
#define __RINGQUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
//使用posix信号量实现生产者消费者模型
//使用循环队列来作为生产者与消费者的交易场所
template<class T>
class RingQueue{
public:
RingQueue(const int& cap)
:rq(cap),
_cap(cap),
p_index(0),
c_index(0)
{
sem_init(&blank,0,cap); //信号量的不同初始值保证了,生产者先开始,保证了同步
sem_init(&data,0,0);
pthread_mutex_init(&lock,nullptr);
}
~RingQueue()
{ //销毁信号量和互斥锁
sem_destroy(&blank);
sem_destroy(&data);
pthread_mutex_destroy(&lock);
}
void Push_data(const T& value)
{
P_blank(); //申请空格
LockQueue();
p_index %= _cap;
rq[p_index++] = value;
UnLockQueue();
V_data(); //数据信号量加1 //保证了同步操作
}
void Pop_data( T& value)
{
P_data(); //申请数据信号量,取走数据
LockQueue();
c_index %= _cap; //循环队列要对下标取值
value = rq[c_index++];
UnLockQueue();
V_blank(); //归还空格信号量
}
private:
void P_blank() //申请空格信号量
{
sem_wait(&blank);
}
void V_blank() //归还空格信号量
{
sem_post(&blank);
}
void P_data() //申请数据信号量
{
sem_wait(&data);
}
void V_data() //归还数据信号量
{
sem_post(&data);
}
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&lock);
}
private:
sem_t blank; //空格的信号量,来表示
sem_t data; //存入数据的信号量
pthread_mutex_t lock; //在有多个生产者与消费者时用锁保护
//两个信号量分别来保证生产者与消费者的同步
std::vector<T> rq; //将其作为一个循环队列进行存取数据
int _cap; //该队列的容量
int p_index; //生产者走的位置
int c_index; //消费者走的位置
//消费者与生产者不会从同一个位置上操作,从而实现了互斥,保证消费者在生产者的后面
};
#endif
测试
#include "rcp.hpp"
const int num = 5;
void* consume_routine(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
int data;
while(1)
{
rq->Pop_data(data);
std::cout<<"consume get the data: "<<data<<std::endl;
}
}
void* product_routine(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
srand((unsigned long)time(nullptr));
int data = 0;
while(1)
{
data = rand() % 100 + 1;
rq->Push_data(data);
std::cout<<"product push the data: "<<data<<std::endl;
sleep(1); //改变生产者的放数据的速度
}
}
int main()
{
pthread_t consume;
pthread_t product;
RingQueue<int>* rq = new RingQueue<int>(num);
pthread_create(&consume,nullptr,consume_routine,(void*)rq);
pthread_create(&product,nullptr,product_routine,(void*)rq);
pthread_join(consume,nullptr);
pthread_join(product,nullptr);
delete rq;
return 0;
}