1.线程安全问题
多个线程同时访问临界资源,产生二义性。
- 临界资源:也称共享资源,比如一个对象,对象 中的属性,一个文件,一个数据库等
2.解决线程安全问题
- 同步:保证执行流对临界资源的合理访问。(轮询访问)
- 互斥:保证不同执行流对临界资源的原子访问。即每次只能有一个执行流访问临界资源。
3.互斥如何实现
1.互斥锁
- 互斥锁底层是一个互斥量计数器,只有两个取值。0/1
0表示资源不可以被访问
1表示资源可以被访问
如何保证互斥锁的原子访问
- 直接交换寄存器与内存中的值!
互斥锁使用过程
- 当计数器为1,访问临界资源。计数器减1
- 访问完成,释放互斥锁。计数器+1
2.互斥锁操作过程
1.定义互斥锁变量
pthread_mutex_t mutex1;
2.初始化互斥锁:
int pthread_mutex_init(pthread_mutex_t * restrict mutex,const pthread_mutexattr_t*restrict attr)
第一个参数:mutex 互斥锁变量地址
第二个参数:互斥锁属性,一般设置为NULL
3.加锁
- 如果一个线程要占用共享资源,必须先申请对应互斥锁,使用函数:
以阻塞方式申请互斥锁:(阻塞等待,不会向下执行)
extern int pthread_mutex_lock(pthread_mutex* _mutex)
以非阻塞方式申请互斥锁:(直接返回,向下执行)
extern int pthread_mutex_trylock(pthread_mutex* _mutex)
4.释放互斥锁
释放互斥锁函数:
extern int pthread_mutex_unlock(pthread_mutex_t* _mutex)
5.销毁互斥锁
销毁互斥锁函数:
extern int pthread_mutex_destroy(pthread_mutex_t* _mutex)
注意:
- 在创建线程之前初始化
- 在访问临界资源前加锁
- 在所有可能退出的地方解锁
3.死锁
- 程序中每一个执行流均占有锁资源,并且不释放的情况下要申请对方执行流的锁资源。
死锁的四个必要条件
- 互斥条件:一把锁只能被一个线程所拥有。
- 请求与保持条件:不释放自己的锁资源还想申请其他执行流的锁资源
- 循环等待:若干执行流请求的锁资源的情况形成闭环。
- 不可剥夺:别的线程不可释放自己的锁资源
避免死锁
- 破坏必要条件
- 加锁顺序一致
线程先把所有需要的锁依次拥有,其他线程在开始占有锁 - 避免锁没有被释放
- 一次性分配所有资源
一个执行流要使用两个临界资源
一个空闲(可用),一个不可用 不加锁
两个资源都可用 加锁
4.同步如何实现
- 使用条件变量完成同步
1.条件变量
- 构成:
1个使线程等待接口
1个唤醒线程接口
1个pcb等待队列
条件变量使用过程
- 线程判断当前资源是否可用
可用,则访问临界资源
不可用,则进入pcb等待队列 - wait–>sleep
- 当其他线程产生临界资源,唤醒唤醒等待的线程获取资源。
2.条件变量的接口
1.定义条件变量
pthread_cond_t cond1;
2.初始化条件变量
int pthread_cond_init(pthread_cond_t *, const pthread_condattr_t *);
第一个参数:传入条件变量地址
第二个参数:条件变量的属性,一般为NULL
3.等待接口
- 当前调用的线程陷入阻塞
- 将该pcb放到等待队列
- 对互斥锁进行解锁(可以理解为该进程已经竞争到锁资源,但是没有可用临界资源。所以需要释放互斥锁)
- 当被唤醒时,需进行锁资源竞争(加锁)
竞争到锁资源 两种情况:临界资源有用则使用后解锁,临界资源没用则放入等待队列并解锁
没有竞争到锁资源 继续竞争
//参数cond指定条件变量,参数mutex指定互斥量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t * mutex);
//条件变量只保证同步【消费者和生产者】,互斥锁保证(线程间)互斥【消费者和消费者,生产者和生产者】。
4.唤醒接口
//唤醒至少一个阻塞在条件变量上的线程,也有可能会唤醒多个线程
int pthread_cond_signal(pthread_cond_t *cond);
//唤醒全部阻塞在条件变量上的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
5.销毁条件变量
//销毁一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
举例:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define THREADCOUNT 4
//对于全局变量g_noodle的取值,我们为了验证线程之间进行同步,
//我们规定 0 是没有面,1 是有面的情况
int g_noodle = 0;
//定义互斥锁资源
pthread_mutex_t g_mutex;
//吃饭的条件变量
pthread_cond_t g_cond;
//做饭的条件变量
pthread_cond_t g_makecond;
void* EatStart(void* arg)
{
(void)arg;
while(1)
{
pthread_mutex_lock(&g_mutex);
while(g_noodle == 0)
{
pthread_cond_wait(&g_cond, &g_mutex);
}
g_noodle--;
printf("i am [%p], eat one noodle:[%d]\n", pthread_self(), g_noodle);
pthread_mutex_unlock(&g_mutex);
pthread_cond_signal(&g_makecond);
}
return NULL;
}
void* MakeStart(void* arg)
{
(void)arg;
while(1)
{
//互斥锁只是保护临界资源的
pthread_mutex_lock(&g_mutex);
while(g_noodle == 1)
{
pthread_cond_wait(&g_makecond, &g_mutex);
}
g_noodle++;
printf("i am [%p], i make one noodle:[%d]\n", pthread_self(), g_noodle);
pthread_mutex_unlock(&g_mutex);
pthread_cond_signal(&g_cond);
}
return NULL;
}
int main()
{
pthread_mutex_init(&g_mutex, NULL);
pthread_cond_init(&g_cond, NULL);
pthread_cond_init(&g_makecond, NULL);
pthread_t eat_tid[THREADCOUNT], make_tid[THREADCOUNT];
int i = 0;
int ret = -1;
for(; i < THREADCOUNT; i++)
{
ret = pthread_create(&eat_tid[i], NULL, EatStart, NULL);
if(ret != 0)
{
printf("create thread failed\n");
return 0;
}
ret = pthread_create(&make_tid[i], NULL, MakeStart, NULL);
if(ret != 0)
{
printf("create thread failed\n");
return 0;
}
}
for(i = 0; i < THREADCOUNT; i++)
{
pthread_join(eat_tid[i], NULL);
pthread_join(make_tid[i], NULL);
}
pthread_mutex_destroy(&g_mutex);
pthread_cond_destroy(&g_cond);
pthread_cond_destroy(&g_makecond);
return 0;
}
5.生产者与消费者模型
注意:
- 1.如果生产者/消费者有多个,需要循环判断临界资源数量
- 2.生产者和消费者需要放到不同pcb等待队列中,保证可以及时被唤醒执行操作。
1.优点:
可以解耦合,消费者和生产者只通过队列进行。
- 支持忙闲不均。队列多个节点起到缓冲作用
- 支持并发执行
2.如何实现
一个线程安全队列+两种角色(消费者+生产者)+三种关系(消费者与消费者互斥,生产者与生产者互斥,消费者与生产者同步)
- STL中的queue,(线程不安全)。
3.代码实现
#include <stdio.h>
#include <pthread.h>
#include <iostream>
#include <queue>
#define THREADCOUNT 4
class BlockQueue
{
public:
BlockQueue(int Capacity)
{
Capacity_ = Capacity;
pthread_mutex_init(&QueueMutex_, NULL);
pthread_cond_init(&ConsumeCond_, NULL);
pthread_cond_init(&ProduceCond_, NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&QueueMutex_);
pthread_cond_destroy(&ConsumeCond_);
pthread_cond_destroy(&ProduceCond_);
}
bool IsFull()
{
if(Queue_.size() == Capacity_)
{
return true;
}
return false;
}
int Pop(int* Data)
{
pthread_mutex_lock(&QueueMutex_);
while(Queue_.empty())
{
pthread_cond_wait(&ConsumeCond_, &QueueMutex_);
}
*Data = Queue_.front();
Queue_.pop();
pthread_mutex_unlock(&QueueMutex_);
pthread_cond_signal(&ProduceCond_);
return 0;
}
int Push(int& Data)
{
pthread_mutex_lock(&QueueMutex_);
while(IsFull())
{
pthread_cond_wait(&ProduceCond_, &QueueMutex_);
}
Queue_.push(Data);
pthread_mutex_unlock(&QueueMutex_);
pthread_cond_signal(&ConsumeCond_);
return 0;
}
private:
std::queue<int> Queue_;
//定义的Queue_的容量
size_t Capacity_;
//互斥
pthread_mutex_t QueueMutex_;
//同步
pthread_cond_t ConsumeCond_;
pthread_cond_t ProduceCond_;
};
void* ConsumeStart(void* arg)
{
BlockQueue* bq = (BlockQueue*)arg;
while(1)
{
int Data;
bq->Pop(&Data);
printf("ConsumeStart [%p][%d]\n", pthread_self(), Data);
}
return NULL;
}
void* ProduceStart(void* arg)
{
BlockQueue* bq = (BlockQueue*)arg;
int i = 0;
while(1)
{
bq->Push(i);
//该执行流被打断了
printf("ProduceStart [%p][%d]\n", pthread_self(), i);
i++;
}
return NULL;
}
int main()
{
pthread_t Con_tid[THREADCOUNT], Pro_tid[THREADCOUNT];
int i = 0;
int ret = -1;
BlockQueue* bq = new BlockQueue(5);
for(; i < THREADCOUNT; i++)
{
ret = pthread_create(&Con_tid[i], NULL, ConsumeStart, (void*)bq);
if(ret != 0)
{
printf("create thread failed\n");
return 0;
}
ret = pthread_create(&Pro_tid[i], NULL, ProduceStart, (void*)bq);
if(ret != 0)
{
printf("create thread failed\n");
return 0;
}
}
for(i = 0; i < THREADCOUNT; i++)
{
pthread_join(Con_tid[i], NULL);
pthread_join(Pro_tid[i], NULL);
}
delete bq;
return 0;
}