看看常用同步组件如何借助aqs来实现吧!
利用aqs实现的同步组件
- ReentrantLock
- CountdownLatch
- Semaphore
- ReentrantReadWriteLock
- StampedLock
- CyclicBarrier
- Condition
观察这些类的源码,其实可以总结出一个应用aqs
的模式。 下面用自定义实现一个MutexLock
来演示这个模式。
public class MutexLock{
/* 1.
定义一个继承自aqs的静态内部类Sync
该类去覆盖实现tryAcquire/tryRelease
或者tryAcquireShared/tryReleasedShared
这里是独占式的互斥锁,因此只用实现前者
若是Semaphore或者CountdownLatch则只用实现后者
*/
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(arg != 1)
throw new IllegalArgumentException;
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(arg != 1)
throw new IllegalArgumentException;
if(getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
final ConditionObject newCondition(){
return new ConditionObject();
}
}
/*2. 类内部持有一个Sync对象*/
private final Sync sync;
public MutexLock() { this.sync = new Sync(); }
/*
3.
根据类的每个方法的具体功能分别去调用Sync的acquire/release/try
*/
@Override
public void lock() { sync.acquire(1); }
@Override
public void unlock() { sync.release(1); }
}
复制代码
各个同步组件的使用与实现
CountdownLatch
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
使用场景
多线程计算,最后主线程将所有计算结果汇总,再计算。也就是主线程一定要保证上一步的计算中,每个线程都计算完毕,都执行完毕。有点类似join,不过是等待所有线程都结束,而不是某个特定的线程。
核心方法
- countDown(); -> 一个线程执行完毕后调用,效果是,
count--
- await(); -> 阻塞当前线程直到 count = 0
- await(long, TimeUnit); -> 阻塞当前线程直到 count = 0或者超出限定时间
实现原理
CountdownLatch维护的Sync对像有了新的构造函数。state可以为int范围内的任意数。
Sync(int count) {
setState(count);
}
复制代码
CountdownLatch维护的Sync对象重写了aqs的共享式尝试获取同步状态的两个方法:tryAcquireShared
tryReleaseShared
//只有当state也就是countDownLatch中的count减少到0时
//才会返回正数
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
具体到await方法
和countDown
方法:
//选择使用可中断的acquire方法
//与aqs的acquire类似
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//每次只释放一个permit,也就是state只会减少1
public void countDown() {
sync.releaseShared(1);
}
复制代码
Semaphore
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {acquire} blocks if necessary until a permit is available, and then takes it. Each {release} adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the {Semaphore} just keeps a count of the number available and acts accordingly.
控制并发访问的线程个数,相当于操作系统里的信号量.
核心方法
- acquire() 在临界区开始处调用,如果此时permits已经申请完,那么会阻塞在这里,直到新的release操作可能被唤醒
- release() 在临界区结束处调用,归还对临界区的访问权限permits,可能会唤醒哪些申请失败,正在同步队列排队的线程
- acquire(int arg) 一次可以申请多个权限
- release(int arg) 一次可以归还多个权限
实现原理
维护了两个aqs
子类对象,FairSync
, NonFairSyn
, 它们都继承自Sync
。他们共享父类的tryReleaseShared
方法:
protected final boolean tryReleaseShared(int releases) {
//经典的CAS死循环,类比原子类的getAndIncrement()
//保证多个线程同时释放对象的线程安全
for (;;) {
int current = getState();
//归还releases数量的permits到state中
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
复制代码
公平与非公平主要是判断对锁新的申请,是不是要排队。所以他们都各自重写了aqs
的tryAcquireShared
方法。
Fair版本:
protected int tryAcquireShared(int acquires) {
for (;;) {
/*
如果当前aqs维护的CLH队列中有在等待的节点
并且这个节点的下一个节点的线程不是当前线程
那么会直接失败,失败后会怎样呢?
参考acquireShared的源码
尝试失败后会直接加入队列的尾部(addWaiter())
*/
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
NonFair版本:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
//父类Sync中的方法
final int nonfairTryAcquireShared(int acquires) {
//还是经典的CAS循环
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
NonFair版本中调用了父类中的nonfairTryAcquireShared
,这个方法和Fair版本的唯一区别就是少了对hasQueuedPredecessors()
的判断。 也就是说它会不断的通过CAS操作去设置state
,一旦设置成功,remaining >= 0
时,它会直接忽略CLH队列中等待的节点,优先的进入临界区。 设置失败后(remaining < 0
),那么还是和Fair版本的一样,也得乖乖的去排队。
那么具体到Semaphore
的核心方法上:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
复制代码
ReentrantLock
该锁对比文章开头自定义的MutexLok, 增加了了公平与非公平锁的功能,还实现了可重入性。 关于公平与非公平,上面的Semaphore
已经提到过,只是tryAcquire
方法中有没有对 hasQueuedPredecessors
加以判断。 那么我们来重点关注一下它的可重入性。
可重入性,也就是同一个线程在已经获取锁的情况下,可以多次的再次获取锁。 不可重入的锁,在递归调用的方法上加锁,就会出现死锁。 因此出现了可重入锁。
可重入锁的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前没有任何线程获取该锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//该锁已经被某个线程获得
//判断这个线程是不是当前线程,如果是,那么可以再次的获取锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
ReentrantReadWriteLock
读写分离的、可重入的锁。 使用它可以提高并发的吞吐量,提高了读取操作的效率。
核心方法
- readlock.lock() 读取操作之前获取锁
- readlock.unlock() 读取结束之后释放锁
- writelock.lock() 写操作之前获取锁
- writelock.unlock() 写操作之后释放锁
锁的优先级与规则
- 多个读取线程可以同时获取到锁
- 当有读锁未释放的时候,获取写锁会失败。也就是写锁的获取,一定要等待所有正在进行的读取操作结束。
- 写锁只能由一个线程独占式获取,获取后其他线程的读/写锁获取,均失败。
- 写锁获取后,这个线程还能成功的获取到读锁(称为锁降级)
- 新来的读锁在,队列中存在写者等待的情况,有可能失败
实现原理
将aqs中的int
型变量 state
划分成了两个十六位的unsigned short
。
- 低16位表示 exclusive (writer) lock hold count
- 高16位表示 shared (reader) hold count
同时写了两个快捷获取读锁/写锁状态的函数:sharedCount(int c)
和 exclusiveCount(int c)
。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
先来看看读锁的获取。
//传入的int型参数,没有被这个函数用过
//虽然unused, 但为了维持这个接口还是不改变函数签名
protected final int tryAcquireShared(int unused) {
/*
* 1. 如果其他线程持有写锁,失败(返回小于0的值)
* 2. 否则,这个线程是一个可以去锁定写锁状态的线程
* 那么去看看它由于排队的策略
* 以及避免饥饿的启发式算法,它是否应该阻塞。
* 重入性请求被延迟到full version处理。
* 3. 如果第二步失败,执行full version
*/
Thread current = Thread.currentThread();
int c = getState();
//其他线程持有写锁,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
/*
去往full version 的三个条件:
1. 根据启发式的避免饥饿算法,当前读者应当阻塞
2. 当前读者数量超过上限
3. CAS设置失败
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//ThreadLocal线程封闭,让每个线程可以获取当前获得锁的数量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
复制代码
读锁的释放相对简单一些。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//HoldCount相关操作
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//释放锁的核心操作CAS循环
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
/* 读锁的释放不会对读者产生影响
但它也许会唤醒写者 */
return nextc == 0;
}
}
复制代码
写锁的获取。
protected final boolean tryAcquire(int acquires) {
/*
1. 读锁不为空,或者写锁不为空并且持有线程不是当前线程,fail
2. 写锁饱和,fail.(发生原因:重入次数过多,超过1 << 16 - 1.)
3. 否则,它有机会去获取写锁
*/
Thread current = Thread.currentThread();
int c = getState();
//w -> 写锁数量
int w = exclusiveCount(c);
//state != 0 -> 必定存在写锁/或者读锁被某个线程持有
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//state == 0 -> 没有任何线程持有读锁/写锁
//如果非公平锁,writerShouldBlock()始终返回false
//公平锁,writerShouldBlock()会调用hasQueuedPredecessors()作为返回值
//如果当前写者不用阻塞那么就尝试去CAS设置state
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
复制代码
写锁的释放。
protected final boolean tryRelease(int releases) {
//如果当前线程不是独占的获取了锁
//就没有资格释放锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
复制代码
关于公平性,还有写者饥饿的问题要具体看NonFairSync和FairSync对读者/写者应否阻塞的控制。 NonFair版本:写者优先。
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // 写者总是插队
}
final boolean readerShouldBlock() {
/*
启发式的避免饥饿的方法
*/
return apparentlyFirstQueuedIsExclusive();
}
}
//父类aqs中的方法
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
/*
返回为假的四种情况
1. 队列中首节点为空 -> 就是没有等待的节点,那么不需要阻塞读者线程
2. 首节点的下一个节点为空 -> 只有一个等待的节点,那么也不需要阻塞
3. 首节点的下一个节点是共享模式 -> 队列中还有未出队的读者线程
4. 下一个节点中没有线程或者被中断
*/
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
复制代码
Fair版本:读者和写者公平排队。
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
复制代码
锁降级
在获取写锁的情况下,申请写锁,最后释放写锁。写锁降级为读锁。
在《Java 并发编程的艺术》一书中说道:
如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。
并且书中是用数据可见性来解释锁降级的必要性的。 而得到这个结论的依据是 “如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新”。 那么就有一个问题: 如果另一个线程获取了写锁(并修改了数据),那么这个锁就被独占了,没有任何其他线程可以读到数据,谈什么 “感知数据更新”?
我的理解是,锁降级只是一种特殊的重入机制。相比释放写锁后再获取读锁这样的手动降级,有两个最明显的优点。
- 写锁持有过程中,再次获取读锁不会产生死锁
- 写锁持有过程中,获取读锁会缩短流程,提高效率,能保证当前线程最先拿到读锁。而不是可能和其他读者竞争。 也不会因为写者优先的策略迟迟拿不到锁。保证了当前线程工作的连贯性,减少了线程切换的开销。
Condition
Condition其实是一个接口。
核心方法
- await() -> Causes the current thread to wait until it is signalled or interrupted.
- signal() -> Wakes up one waiting thread.
- signalAll() -> Wakes up all waiting threads.
aqs中的内部类ConditionObject实现了这个接口。
实现原理
ConditionObject中的await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
//fullyRelease(node)将所有的state释放掉
//相当于隐式的释放掉所有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//当线程已经转移到Sync队列上以后正常的去尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
唤醒线程:其实把线程从 condition queue 转移到了 sync queue 。
- signal() -> dosSignal() -> transferForSignal()
/**
* 将节点从condition queue 转移到 sync queue中。
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node);
复制代码
并且Lock
接口定义了一个newCondition()
方法,规定所有的锁要提供一个获取与这个锁对应的Condition对象。 ReentrantLock的实现中,在同步器Sync中增加了一个newCondition
方法, 该方法新建一个ConditionObject对象返回。 注意,这个ConditionObject是aqs的一个内部类,通过这个ConditionObject可以访问aqs的任何成员以及方法。 这也是使用内部类一个最大的优点。
CyclicBarrier
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
和CountdownLatch比较类似,但CyclicBarrier更加注重的是集合内的线程同步,线程组内的所有线程都必须等待组内其他线程运行到一个barrier point
,才能继续执行。能够处理更加复杂的场景。并且CyclicBarrier内有一个Generation对象,可以一代一代的重用下去。
核心方法
- public CyclicBarrier(int parties); 构造方法,parties指定线程必须执行的await()方法次数
- public CyclicBarrier(int parties, Runnable barrierAction); Runnable指定所有线程到达barrier后优先执行的线程。
- public int await(); 标志该线程到达barrier, 将count--, 当其他线程都到达barrier后返回当前线程到达的index
实现原理
CyclicBarrier维护了一个ReentrantLock对象lock
,以及一个由该对象生成的Condition对象trip
。 通过这两个对象可以保证对count
操作的线程安全,可以控制线程之间的阻塞与唤醒。
await()方法源码:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
复制代码
该方法直接调用了核心代码dowait()方法,并且指定false, 也就是不支持超时返回,时间为0L(unused)。
dowait(boolean, nanos)核心源码:
/**
* 主要的barrier代码,包含了各种策略
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//lock保证对CyclicBarrier对象的线程安全
lock.lock();
//这个try块内容非常多
//包括count--操作以及循环阻塞的过程
try {
//中断或者barrier损坏,抛出对应异常
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//没有损坏则将count--
//因为加了锁,所以不用担心count数据线程不安全
//index -> 可以推断到达barrier的先后次序
int index = --count;
//index == 0 -> 这是最后到达barrier的线程
// -> tripped
if (index == 0) { // tripped
//根据初始化的参数决定是否执行barrierCommand
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//command != null
//初始化时已经设置了barrierCommand,需要在此处执行
if (command != null)
command.run();
ranAction = true;
/*
//这一个generation结束,开始新的一轮计数同步
//并且生产一个新的Generation对象
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
*/
nextGeneration();
//dowait()返回
//意味着所有线程可以接着执行下面的代码了
return 0;
} finally {
//try块中的ranAction没有得到执行
//就自动将这个barrier中断,可能是因为
//command.run() 被中断或者异常
if (!ranAction)
breakBarrier();
}
}
//循环直到所有线程到达barrier
//或者barrier被中断,或者超时
for (;;) {
try {
//如果没有时间限制
//无限等待
//trip是一个Condition接口的对象
if (!timed)
trip.await();
//有时间限制,超时等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
}
//相应中断相关操作
catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//在执行过nextGeneration后
//这个全局的generation对象被赋予了新的值
//当前线程接收到nextGeneration中的signalAll()方法后
//返回当初记录它来到barrier的次序
if (g != generation)
return index;
//超时,中断barrier,抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
复制代码
看完这段代码,似乎还是有一个不解之问。就是dowait()
方法的代码都是在临界区里执行的,包括for
循环。 而我们的功能要求我们应该有多个线程并发的执行dowait
方法,但是,当第一个线程dowait
后,就上锁了,并且直到中断或者tripped才释放锁。 那么其他的线程根本就访问不了CyclicBarrier这个对象了,更别提一起到达barrier
,将count
减少到0了。
其实在for循环开始时,执行trip.await()
时,会去执行ConditionObject里的await()
方法,这个方法会首先执行一个fullyRelease(Node node)
函数, 将state
尝试置为0。就相当于是隐式的释放掉了当前所有的锁了。所以后续的线程调用dowait()
方法,不会被锁在临界区外。因为一旦有线程trip.wait()
,锁的状态就会清零。