并发编程的解决方案
我们在开发的过程中经常会遇到并发的问题,当遇到并发问题的时候,我们通常通过锁来解决。其实锁的实现方案有如下两种
- 信号量
- 管程
管程和信号量是等价的,用管程可以实现信号量,也可以用信号量实现管程,管程对开发者更友好
信号量
信号量是操作系统的一种同步机制,由一个整型变量sem和两个原子操作组成
P(Prolaag,荷兰语尝试减少):sem-1,如果sem<0进入等待队列,否则继续
V(Verhoog,荷兰语增加):sem+1,如果sem<=0,说明等待队列中有线程,唤醒一个线程
sem变量在初始化完成时,只能通过PV操作来修改,由操作来保证PV操作的原子性,P可能阻塞,V不会阻塞
代码实现
public class Semaphore {
private int sem;
private WaitQueue q;
public void P() {
sem--;
if (sem < 0) {
add this thread t to q;
// 阻塞线程
block(t);
}
}
public void V() {
sem++;
if (sem <= 0) {
remove a thread t from q;
// 唤醒线程
wakeup(t)
}
}
}
信号量可以分为2类
- 二进制信号量:资源数目为0或者1
- 资源信号量:资源数目为任何非负值
信号量有有如下作用
- 实现对临界区(每次只能被一个线程访问的区域)的互斥访问
- 实现条件同步
实现对临界区的互斥访问
Semaphore mutex = new Semaphore(1);
mutex.P();
// do something
mutex.V();
信号量的初始值必须为1,PV操作配对使用
实现条件同步
Semaphore condition = new Semaphore(0);
// ThreadA,进入等待队列中
condition.P();
// ThreadB,唤醒等待线程 ThreadA
condition.V();
信号量的初始值必须为0,ThreadA执行P操作时会被阻塞,ThreadB执行V操作时会唤醒等待的线程ThreadA
用信号量实现阻塞队列
用一个二进制信号量mutex实现互斥访问
用两个资源信号量notFul,notEmptyl实现条件同步
public class BlockingQueueUseSemaphore<T> {
private final Object[] items;
private Semaphore notFull;
private Semaphore notEmpty;
private Semaphore mutex;
private int putIndex;
private int takeIndex;
public BlockingQueueUseSemaphore(int capacity) {
this.items = new Object[capacity];
notFull = new Semaphore(capacity);
notEmpty = new Semaphore(0);
mutex = new Semaphore(1);
}
public void enq(T x) throws InterruptedException {
notFull.acquire();
mutex.acquire();
items[putIndex] = x;
if (++putIndex == items.length) {
putIndex = 0;
}
mutex.release();
notEmpty.release();
}
public T deq() throws InterruptedException {
notEmpty.acquire();
mutex.acquire();
T x = (T) items[takeIndex];
if (++takeIndex == items.length) {
takeIndex = 0;
}
mutex.release();
notFull.release();
return x;
}
}
管程
管程为了解决信号量在临界区PV操作上配对的麻烦,把配对的PV操作集中在一起,生成一种新的并发编程的方法,和面向对象的思想比较吻合
管程中引入了条件变量的概念,每一个共享变量对应一个等待队列
synchronized就是基于管程实现的,只包含一个同步队列,一个等待队列
AQS也是基于管程实现的,只包含一个同步队列,但是可以包含多个等待队列
用管程实现阻塞队列
因为Java中的AQS的设计思想就是管程,我就用AQS中的相关api来实现这个功能
用管程的一个共享变量和2个条件变量即可实现
public class BlockingQueueUseMonitor<T> {
private final Object[] items;
private final Lock lock;
private Condition notFull;
private Condition notEmpty;
private int count;
private int putIndex;
private int takeIndex;
public BlockingQueueUseMonitor(int capacity) {
this.items = new Object[capacity];
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
public void enq(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
// 等待队列不满
notFull.await();
}
items[putIndex] = x;
if (++putIndex == items.length) {
putIndex = 0;
}
count++;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T deq() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
// 等待队列不空
notEmpty.await();
}
T x = (T) items[takeIndex];
if (++takeIndex == items.length) {
takeIndex = 0;
}
count--;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
参考博客
好文
[1]https://www.cnblogs.com/binarylei/p/12544002.html#26-aqs-%E5%92%8C-synchronized-%E5%8E%9F%E7%90%86
[2]https://www.codenong.com/cs109504287/
管程
[3]https://time.geekbang.org/column/article/86089
信号量
[4]https://time.geekbang.org/column/article/88499