通过互斥锁和条件变量实现线程间的同步与互斥
https://blog.csdn.net/qq_43763344/article/details/91580958
但是一个程序一旦上锁效率就特别低,而且有可能出现死锁的情况,所以一般情况下都是使用信号量来实现同步与互斥
使用信号量来实现同步互斥
信号量有 system V 信号量和 POSIX 信号量
这里我们使用 POSIX 信号量,因为它具有可移植性,可用于线程间同步, 而 system V 一般用于进程间同步
POSIX 信号量
功能:可以实现进程或线程之间的同步与互斥
本质:资源计数器 + 等待队列 + 对调用者提供唤醒和阻塞接口
自带一个判断临界资源的的计数器,如果不可以获取资源则阻塞当前的执行流
使用:根据资源数量初始化资源计数器,当执行流需要获取资源时,判断信号量当中计数器值是否大于 0,如果大于 0 则资源可以访问,并且对计数器 -1 操作,然后返回;如果资源计数器值小于等于 0 表示没有可用的资源,sem_wait 函数会陷入阻塞状态,阻塞该执行流,等待有资源可用时再使用
注意:当一个执行流使用完一个资源或者一个执行流产生一个资源后,需要先判断计数器值是否小于0,如果小于0则唤醒PCB等待对列当中的执行流,然后对计数器进行+1操作.
POSIX 信号量的接口
定义
sem_t sem;
初始化
sem_init (&sem,int pshared,int value);
// pahared == 1 代表进程
// pahared == 0 代表线程
// value 初始化资源计数的值
信号量是如何实现互斥?
让信号量当中的资源计数器值等于1
等待
如果有资源则进行访问临界资源,然后正常返回,如果没有资源则阻塞等待
不管有没有资源,都会将信号量当中的计数器值 -1
sem_wait(sem_t* sem)
非阻塞等待:没有就报错返回
sem_trywait(sem_t* sem)
超时时间等待:超过时间就报错返回
sem_timedwait(sem_t* sem,struct timespec)
发布信号量,表示资源使用完(对资源计数器 +1),并唤醒等待队列中的执行流
sem_post (sem_t* sme)
销毁信号量
sem_destroy(sem_t* sme)
利用 POSIX 信号量实现一个生产者和消费者模型的案列
生产者和消费者模型
- 一个场景:队列,相当于一个缓冲区
- 两个角色:生产者和消费者,相当于两个不同的执行流
- 三种关系:生产者和生产者之间是互斥关系,消费者和消费者之间是互斥关系,消费者和生产者之间是同步与互斥关系
通过一个阻塞队列来解决生产者和消费者之间的强耦合性,生产者只管往队列里面生产资源,生产好之后通知消费者(唤醒等待队列中的消费者)来取数据,当队列满时生产者进入 PCB 等待队列变为阻塞状态;对于消费者来说,只管从里面取数据,如果没有数据则通知生产者(唤醒等待对列中的生成者)进行生产资源
这种模型具有解耦合,支持并发和忙闲不均等优点
参考代码
#include <iostream>
#include <stdio.h>
#include <semaphore.h>
#include <vector>
using namespace std;
#define SIZE 4
#define PTHREADCOUNT 4
class RingQueue{
public:
RingQueue()
:vec_(SIZE)
{
Capacity_ = SIZE;
PosWrite_ = 0;
PosRead_ = 0;
// 同步
// 第三个参数,特别注意
//生产者生产的时候就看我们有多少空间可以供我们去生产
sem_init(&PosSem_,0,SIZE);
//消费者初始化的时候是看当前有多少资源可以消费
sem_init(&ConSem_,0,0);
// 互斥
sem_init(&LockSem_,0,1);
}
~RingQueue(){
sem_destroy(&PosSem_);
sem_destroy(&ConSem_);
sem_destroy(&LockSem_);
}
void Push(int& data){
// 先等待然后上锁
// 如果大于 0,表示有位置生产资源,生产完后计数器进行-1 操作
// 表示我们可生产的位置会少1个
// 如果等于 0,则表示没有位置可以生产资源了,陷入阻塞
sem_wait(&PosSem_);
sem_wait(&LockSem_);
vec_[PosWrite_] = data;
PosWrite_ = (PosWrite_+1)%Capacity_;
// 先解锁然后通知
sem_post(&LockSem_);
// 对消费者资源计数器 +1
// 如果 ConSem 小于等于 0,则唤醒,如果大于 0,则不唤醒
sem_post(&ConSem_);
}
void Pop(int* data){
// 判断资源计数器 Consem 是否大于0
// 如果大于0,则说明有资源可用,并占用资源,对计数器-1
// 如果小于0,则说明没有资源可用,也会对资源计数器 -1,
// 与此同时,会阻塞在sem_wait函数这里,等待信号的唤醒
// 当为负数的时候,就表示由线程阻塞了,需要获取资源,
// 那么当生产线程生产完后,会发现值小于0,所以唤醒
sem_wait(&ConSem_);
sem_wait(&LockSem_);
*data = vec_[PosRead_];
PosRead_ = (PosRead_+1)%Capacity_;
sem_post(&LockSem_);
// 这里表示资源使用完毕,将资源计数器 +1
sem_post(&PosSem_);
}
private:
//利用数组和两个指针实现环形队列
vector<int>vec_;
size_t Capacity_;
int PosRead_;
int PosWrite_;
// 生成者和消费者信号量实现同步
sem_t PosSem_;
sem_t ConSem_;
// 二值信号量实现互斥
sem_t LockSem_;
};
void* ProStart(void* arg){
RingQueue* rq = (RingQueue*)arg;
int i = 0;
while(1){
rq->Push(i);
printf("Pro[%p] make data [%d]\n",pthread_self(),i);
i++;
}
return NULL;
}
void* ConStart(void* arg){
RingQueue* rq = (RingQueue*)arg;
int data;
while(1){
rq->Pop(&data);
printf("Con[%p] cons data [%d]\n",pthread_self(),data);
}
return NULL;
}
int main(){
pthread_t Protid[PTHREADCOUNT],Contid[PTHREADCOUNT];
RingQueue* rq = new RingQueue();
for(int i = 0;i < PTHREADCOUNT;++i){
pthread_create(&Protid[i],NULL,ProStart,(void*)rq);
pthread_create(&Contid[i],NULL,ConStart,(void*)rq);
}
for(int i = 0;i < PTHREADCOUNT;++i){
pthread_join(Protid[i],NULL);
pthread_join(Contid[i],NULL);
}
delete rq;
return 0;
}