文章目录
C5 条件变量
通过硬件和操作系统的正确组合来实现锁,但是锁并不是并发程序设计所需的唯一原语
在很多情况下,线程需要检查某一条件满足之后,才会继续运行,如父线程需要检查子线程是否执行完毕,才能继续运行父线程
volatile int done = 0;
void *child(void *arg) {
printf("child\n");
done = 1;
return NULL;
}
int main() {
printf("parent:begin\n");
pthread_t c;
pthread_create(&c, NULL, child, NULL); //创建子线程
while(done == 0) {
;
}
printf("parent:end\n");
return 0;
}
//输出结果
//parent:begin
//child
//parent:end
可以使用一个共享变量来解决,但是效率很低,因为主线程会自旋检查,浪费CPU时间,希望有某种方式让父线程休眠,直到子线程完成执行
在多线程程序中,一个线程等待某些条件是很常见的,简单的方案是自旋到条件满足,这是及其低效的,某些情况下甚至是错误的,那么线程应该如何等待一个条件?
5.1 定义和程序
线程可以使用条件变量来等待一个条件变为真,条件变量是一个显示队列,当某些执行状态不满足时,线程可以把自己加入队列,等待该条件,另外某个线程,当它改变了条件时,就可以唤醒一个或多个等待的线程,让它们继续执行
条件变量有两种操作:wait()和signal(),线程要休眠时,调用wait(),当线程想唤醒等待在某个条件变量上的休眠线程时,调用signal()
wait()调用只有一个参数,它是互斥的,它假定在wait()调用时,这个互斥量是已上锁状态,wait()的职责是释放锁,并让调用线程休眠,当线程被signal()唤醒时,它必须重新获得锁
5.2 生产者/消费者问题
假设有一个或多个生产者线程和一个或多个消费者线程,生产者把生成地数据项放入缓冲区,消费者从缓冲区内取走数据项,以某种方式消费
很多实际的系统中都会有这种场景,如在多线程的网络服务中,一个生产者将HTTP请求放入工作队列(有界缓冲区),消费者线程从队列中取走请求并处理
采用Java处理生产者,消费者的代码如下:
对于生产者,需要设置一个表示“之前生产的数据是否已经被消耗”的标志
对于消费者,需要设置一个表示“是否有数据可供消费”的标志
对于生产者和消费者,需要设置一个共同的对象锁
需要设置一个共享的数据,即生产者生产的数据消费者消耗的数据,这两个是同一个
通过创建一个父类,由两个子类继承来完成:
package com.mec.thread;
public class ProducerCustomer {
//是否有数据以供消费
public static boolean hasValue = false;
//是否以消费
public static boolean isConsume = true;
//对象锁
public static Object lock = new Object();
//共享数据
public volatile static int data;
}
生产者:
package com.mec.thread;
import java.util.Random;
public class Producer extends ProducerCustomer implements Runnable {
private Random random;
private String threadName;
private Thread thisThread;
public Producer() {
random = new Random();
threadName = "生产者";
thisThread = new Thread(this,threadName);
}
public void startProducer(){
thisThread.start();
System.out.println("线程" + threadName + "建立");
}
@Override
public void run() {
while(true) {
synchronized (lock) {
//如果已消耗,生产一个数据,将已消耗标志改为false
//将是否有数据标志改为true,唤醒阻塞的消费者
if(isConsume) {
data = random.nextInt(1000);
System.out.println(threadName + "生产了一个数据" + data);
isConsume = false;
hasValue = true;
lock.notify();
} else {
//如果未消耗,阻塞自己,等待被唤醒
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
消费者:
package com.mec.thread;
public class Customer extends ProducerCustomer implements Runnable {
private String threadName;
private Thread thisThread;
public Customer() {
threadName = "消费者";
thisThread = new Thread(this,threadName);
}
public void startCustomer(){
thisThread.start();
System.out.println("线程" + threadName + "建立");
}
@Override
public void run() {
while(true) {
synchronized (lock) {
//如果有数据未消耗,消耗该数据,将已消耗标志改为true
//将是否有数据改为false,唤醒阻塞的生产者
if(hasValue) {
System.out.println(threadName + "消耗了一个数据" + data);
isConsume = true;
hasValue = false;
lock.notify();
} else {
//如果已经消耗,则阻塞自己,等待被唤醒
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
5.3 覆盖条件
当线程调用进入内存分配代码时,它可能会因为内存不足而等待,相应的,线程释放内存时,会发出信号说有更多的内存空闲,但是代码中有一个问题:应该唤醒哪个等待线程?
最为简单且有效的方法就是唤醒所有等待的线程,这样做,确保了所有应该唤醒的线程都被唤醒,单可能会影响性能,因为不必要地唤醒了其它许多等待的线程,它们本来不应该被唤醒,这些线程被唤醒后,重新检查条件,马上再次休眠
这种条件变量称为覆盖条件,因为它能覆盖所有需要唤醒线程的场景,成本就是太多的不必要的线程被唤醒
5.4 小结
引入了锁之外的一个重要同步原语:条件变量,当某些程序状态不符合要求时,通过允许线程进入休眠状态,条件变量可以帮助解决许多重要的同步问题
C6 信号量
信号量是编写并发程序的强大且灵活的原语
6.1 信号量的定义
信号量是一有个整数值的对象,可以用两个函数来操作它,在POSIX中,是
sem_wait() 即P()
sem_post() 即V()
因为信号量的初始值能决定其行为,所以首先要初始化信号量,才能调用其它函数与之交互:
#include <semaphore.h>
sem_t s; //声明一个信号量
sem_init(&s, 0, 1); //初始化信号量s,
//第二个参数0代表信号量是在同一进程的多个线程共享的
//第三个参数1是将信号量s的初始值置为1
信号量初始化之后,可以调用P()或V()与之交互
P()/sem_wait():
int sem_wait(sem_t *s) {
//信号量的值减1
//信号量的值小于0时调用P()的线程休眠等待
}
V()/sem_post(sem_t *s):
int sem_post(sem_t *s) {
//信号量的值加1
//如果有休眠的线程,唤醒其中的一个
}
P()要么立刻返回什么都不做(信号量的值大于等于0时),要么让调用它的线程休眠等待(信号量的值小于0时),当信号量的值为负数时,这个值就是等待线程的个数
V()直接给信号量的值加1,如果有等待线程,唤醒其中的一个
6.2 用信号量实现锁
用信号量作为锁,把临界区用P() ,V()环绕
sem_t m;
sem_init(&m, 0, 1);
P(&m);
//临界区
V(&m);
假设有两个线程,线程0和线程1,第一个线程先启动调用了P(),它把信号量的值减1,此时信号量为0,但信号量不是负数,所以直接返回,线程0继续运行,当线程0调用V(),此时信号量加1,但此时没有等待线程,所以不会唤醒任何线程
单线程使用一个信号量:
试想线程0调用P()后持有了锁,在它调用V()之前,线程1调用P()尝试进入临界区,这种情况下,线程1把信号量减1,然后线程1休眠等待,线程0继续运行,当线程0调用V()时,信号量变为0,唤醒了等待的线程1,线程1就有机会获取锁,开始运行
两个线程使用一个信号量:
6.3 信号量用作条件变量
信号量也可以用在一个线程暂停执行,等待某一条件成立的场景
假设一个线程创建另一个线程,并等待子线程执行结束
sem_t s;
void *child(void *arg) {
printf("child\n");
V(&s);
return NULL;
}
int main(int argc, char *argv[]) {
sem_init(&s, 0, 0);
printf("parent:begin\n");
pthread_t c;
pthread_create(c, NULL, child, NULL);
P(&s);
printf("parent:end\n");
return 0;
}
//预期结果
//parent:begin
//child
//parent:child
这段程序父线程调用P(),子线程调用V(),父线程等待子线程执行完毕,与先前不同的是这里信号量的初始值为0
程序开始后会有两种情况:
1,父线程创建了子线程,但子线程并没有运行,此时父线程调用P(),信号量初始值为0,父线程调用P()后,信号量为-1,父线程休眠等待,此时子线程被调度运行,子线程输出完后,调用V(),信号量加1,此时信号量为0,唤醒了休眠的父线程
2,子线程被创建后,在父线程调用P()之前就运行结束,这种情况下,子线程先调用V(),此时信号量为1,父线程被唤醒调用P()时,此时信号量减1,为0,P()不做操作直接返回,父线程运行结束
6.4 生产者/消费者问题
(1) 使用一对信号量
用两个信号量empty和full分别表示缓冲区空或者满:
void put(); //生产者将数据放入缓冲区
void get(); //消费者从缓冲区中取出数据
sem_t empty;
sem_t full;
void *producer(void *arg) {
int i;
for(i = 0; i < loops; i++) {
P(&empty); //对empty信号量减1,empty<0停止生产
put(i); //将生产出的值放入缓冲区
V(&full); //对full信号量加1,full>1唤醒消费者
}
}
void *consumer(void *arg) {
int i, tmp = 0;
while(tmp != -1) {
P(&full); //对full信号量减1,full<0停止消费
tmp = get(); //将缓冲区内的值取出
V(&empty); //对empty信号量加1,empty>1唤醒生产者
}
}
int main() {
...
sem_init(&empty, 0, MAX); //MAX为预期生产的数据个数
sem_init(&full, 0, 0);
...
}
假设有两个线程为消费者和生产者,MAX=1,在一个CPU上时:
1,假设消费者先运行,执行到P(&full),调用P(),因为full初始值为0,P()会将full减为-1,导致消费者休眠等待,消费者等待生产者调用V(&full)唤醒它
2,假设生产者先运行,当运行到P(&empty)时,由于empty初始为1,P()导致empty为0,生产者继续运行,当调用V(&full)时,把full变为1,唤醒消费者
但当MAX大于1时,且有多个消费者,多个生产者时,就出现了新的问题:竞态条件,假设两个生产者几乎同时调用put(),那么实际上只生产了一个数据,较早生产的数据被覆盖,其中一个生产者的数据丢失,这是绝对要避免的问题
(2) 增加互斥
对于上述的问题,本质是忘了互斥,向缓冲区加入元素和从缓冲区取出元素的代码段是临界区,所以使用互斥信号量来增加锁控制同步
void put(); //生产者将数据放入缓冲区
void get(); //消费者从缓冲区中取出数据
sem_t empty;
sem_t full;
sem_t mutex; //mutex互斥信号量作为锁
void *producer(void *arg) {
int i;
for(i = 0; i < loops; i++) {
P(&mutex);
P(&empty); //对empty信号量减1,empty<0停止生产
put(i); //将生产出的值放入缓冲区
V(&full); //对full信号量加1,full>1唤醒消费者
V(&mutex);
}
}
void *consumer(void *arg) {
int i, tmp = 0;
while(tmp != -1) {
P(&mutex);
P(&full); //对full信号量减1,full<0停止消费
tmp = get(); //将缓冲区内的值取出
V(&empty); //对empty信号量加1,empty>1唤醒生产者
V(&mutex);
}
}
int main() {
...
sem_init(&empty, 0, MAX); //MAX为预期生产的数据个数
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1); //新增的互斥信号量
...
}
现在给put()和get()部分增加了锁,虽然理想上已经完成了互斥,但是这可能会引起死锁
(3) 死锁
假设有两个线程,消费者和生产者,消费者先运行,获得锁,mutex为0,然后调用P(&full),因为还没有数据,此时消费者阻塞,但是此时消费者仍持有锁,然后生产者运行,mutex为-1,生产者休眠等待,生产者无法获取锁,生产数据
这里就出现了所谓的死锁,消费者持有锁,但没有数据供他消费,它也无法唤醒生产者,生产者被调度时,根本拿不到锁,因此生产者和消费者互相等待对方 – 典型的死锁
(4) 可行方案
要避免死锁,只需要减少锁的作用域,把获取和释放互斥信号量的操作调整为紧挨着临界区,把full,empty的唤醒和等待操作调整到锁外面,结果得到了简单而有效的缓冲区
void put(); //生产者将数据放入缓冲区
void get(); //消费者从缓冲区中取出数据
sem_t empty;
sem_t full;
sem_t mutex; //mutex互斥信号量作为锁
void *producer(void *arg) {
int i;
for(i = 0; i < loops; i++) {
P(&empty); //对empty信号量减1,empty<0停止生产
P(&mutex);
put(i); //将生产出的值放入缓冲区
V(&mutex);
V(&full); //对full信号量加1,full>1唤醒消费者
}
}
void *consumer(void *arg) {
int i, tmp = 0;
while(tmp != -1) {
P(&full); //对full信号量减1,full<0停止消费
P(&mutex);
tmp = get(); //将缓冲区内的值取出
V(&mutex);
V(&empty); //对empty信号量加1,empty>1唤醒生产者
}
}
int main() {
...
sem_init(&empty, 0, MAX); //MAX为预期生产的数据个数
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1); //新增的互斥信号量
...
}
6.5 读者-写者锁
读者-写者锁来源于对更灵活的原语的追求,它承认不同的数据结构访问可能需要不同类型的锁。如一个并发链表有很多插入和查找工作,插入操作会修改链表的状态,而查找操作只是读取数据结构,只要没有进行插入操作,就可以并发地执行多个查找操作
一旦一个读者线程获取了读者锁,其它的读者线程也可以获取这个锁,但是一个线程想要获取写者锁,就必须等到所有的读者释放读者锁,最后一个退出的读者在writelock信号量上调用V(),从而让等待的写者能获取写者锁
typedef struct_rwlock_t {
sem_t lock; //读者锁
sem_t writelock; //写者锁
int readers;
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock, 0, 1); //初始化读者锁
sem_init(&rw->writelock, 0, 1); //初始化写者锁
}
//获取读者锁
//且第一个读者自动获取写者锁
void rwlock_acquire_readlock(rwlock_t *rw) {
P(&rw->lock);
rw->readers++;
if(rw->readers == 1) {
P(&rw->writelock);
}
V(rw->lock);
}
//获取写者锁
void rwlock_acquire_writelock(rwlock_t *rw) {
P(&rw->writelock);
}
//释放写者锁
void rwlock_release_writelock(rwlock_t *rw) {
V(rw->writelock);
}
获取读者锁时,读者首先要获取lock,然后增加readers变量,readers用来追踪当前有多少个读者在访问数据结构,重要的步骤在rwlock_acquire_readlock()内发生,当第一个读者获取该锁时,该读者也会获取写者锁,且释放读者锁
这一方案虽然可行,但仍有缺陷,尤其是公平性,读者很容易饿死写者,读者-写者锁在性能方面没有多少优势,它展示了如何以多样,有用的方式使用信号量
6.6 哲学家进餐问题
假定5个哲学家围着一个圆桌,每两位哲学家之间有一把餐叉,哲学家有时需要思考一会,不需要餐叉,但有时需要就餐,而一位哲学家只有同时拿到左手边和右手边的两个餐叉才能吃到东西
关于餐叉的竞争以及随之而来的同步问题,就是我们在并发编程中研究它的原因,下面是每个哲学家的基本循环:
while(1) {
think();
getforks();
eat();
putforks();
}
关键问题在于如何实现getforks()和putforks()函数,这个过程中不会出现死锁,且尽可能让更多的哲学家吃到东西
判断餐叉位置的辅助函数:
//p是哲学家编号
//左手边的餐叉标号
int left(int p) {
return p;
}
//右手边的餐叉标号
int right(int p) {
return (p+1)%5;
}
且需要一些信号量来解决问题,假设需要5个,每个餐叉一个
sem_t_forks[5]
假设每个餐叉的信号量都用1来初始化,且每个哲学家知道自己的编号p,可以写出getforks()和putforks():
//每个哲学家取餐叉,试图获取其左右的餐叉
void getforks() {
P(forks[left(p)]);
P(forks[right(p)]);
}
//每个哲学家放餐叉,同时释放左右的餐叉
void putforks() {
V(forks[left(p)]);
V(forks[right(p)]);
}
思路如下:为了拿到餐叉,每个哲学家试图获取左边餐叉的锁,再获取右边餐叉的锁,结束就餐时,先释放左边餐叉的锁,再释放右边餐叉的锁
问题来了,这样处理可能会出现死锁,假设每个哲学家都拿到了左手边的餐叉,并且一直在等待获取右手边的餐叉,所有的哲学家永远都无法获取右手边餐叉的锁
为了解决上述问题,最简单的方法就是修改某个或某些哲学家的取餐叉顺序,假设4号哲学家先取右手边餐叉,再取左手边餐叉:
void getforks() {
if(p == 4) {
P(forks[right(p)]);
P(forks[left(p)]);
} else {
P(forks[left(p)]);
P(forks[right(p)]);
}
}
这样处理时,因为最后一个哲学家尝试先拿右手边的餐叉,然后拿左手边的餐叉,所以不会出现每个哲学家都拿着一个餐叉,都等待另一个餐叉的情况