基于 AQS 分析 CountDownLatch
「这是我参与2022首次更文挑战的第6天,活动详情查看:2022首次更文挑战」
代码案例
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
try {
Thread.currentThread().setName("线程一");
// 等待一会
System.out.println(Thread.currentThread().getName() + "等待线程二以及线程三执行完成之后再执行......");
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "执行完了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
Thread.currentThread().setName("线程二");
System.out.println(Thread.currentThread().getName() + "执行完成,执行countDown");
countDownLatch.countDown();
}).start();
new Thread(() -> {
Thread.currentThread().setName("线程三");
System.out.println(Thread.currentThread().getName() + "执行完成,执行countDown");
countDownLatch.countDown();
}).start();
}
}
复制代码
代码运行结果
代码解释
线程一需要等待线程二以及线程三执行完成之后执行countDown之后线程一才能执行,就像出行活动,等所有人都到了才能出发
源码分析
构造函数分析
先分析一下这个重要代码
CountDownLatch countDownLatch = new CountDownLatch(2);
复制代码
先分析一下这个构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
复制代码
这里面还有sync这个属性变量,又是那个关键的AQS中的同步工具类,我们查看一下到底是啥
Sync(int count) {
setState(count);
}
复制代码
这里面设置了一下state的值,这个值我们都知道这个是锁的关键标记信息,如果这个state不为0的话那么就证明有其他线程加锁了已经,我们这里是传入的2,所以就是将state设置成了2
protected final void setState(int newState) {
state = newState;
}
复制代码
await方法分析
其实就是将当前的线程进行挂起,或者就是等待刚才设置的state的值为0之后再尝试出队获取锁再执行
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
调用await方法里面进行了该方法的调用 sync.acquireSharedInterruptibly(1),进入里面看看
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
复制代码
看主要的方法,tryAcquireShared(arg) < 0 这块逻辑是否成立,中断的地方先不管
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
复制代码
这个方法就是判断了一下当前的state值是否等于0,如果是0那么返回1,则if条件不成了,此时的state是等于2的所以返回了-1,所以走doAcquireSharedInterruptibly(arg)方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 传进来的参数是1
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
方法有点长我们慢慢分析一下,先看这个addWaiter方法,里面传了一个Node.SHARED,其实就是添加了一个Node进入AQS的等待队列
private Node addWaiter(Node mode) {
// 创建一个Node 节点
Node node = new Node(Thread.currentThread(), mode);
// 此时的pred 和 tail指针目前都是空值
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 所以走到了这个入队的方法
enq(node);
return node;
}
复制代码
第一步先创建一个Node 节点,第二步进行了入队的方法,因为此时的pred 和 tail指针目前都是空值,此时的if条件不成立,这个enq的方法真的是看了好多遍了,再巩固一下
private Node enq(final Node node) {
for (;;) {
// 此时t指向的是空的,因为tail也是空的,所以需要进行初始化队列
Node t = tail;
if (t == null) { // Must initialize
// 初来乍到,创建一个空的Node节点,之后将空head指针指向Node节点,之后tail尾部指针指向了head指向的头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
初来乍到,此时t指向的是空的,因为tail也是空的,所以需要进行初始化队列,因为t==null,创建一个空的Node节点,之后将空head指针指向Node节点,之后tail尾部指针指向了head指向的头节点
此时进行下一次循环,t指向了tail指针,所以此时t不等于null了所以走到了else逻辑,新Node节点将他的前驱指针指向了当前队列的head节点,并且tail指针指向了当前新入队的node节点,之后t【此时的t就相当于head节点】的next指向了当前新入队的node节点,并且返回了头节点,此时队列中的节点样式
此时该走到下面的for循环里面的逻辑了
for (;;) {
// 获取当前节点的前驱节点,此时前驱节点就是头节点
final Node p = node.predecessor();
// 条件成立
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
复制代码
走到下面的逻辑尝试获取共享模式锁tryAcquireShared此时这个值传进来的是1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
复制代码
此时还是-1,所以获取再一次失败了,走到下面挂起的逻辑了,这个逻辑我们也看了好多次了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 第一次进来的时候此时的头节点的waitStatus还是0或者null呢
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 所以第一次就走到这里了将头节点的waitStatus设置成了-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
第一次进来的时候此时的头节点的waitStatus还是0或者null呢,所以第一次就走到这里了将头节点的waitStatus设置成了-1,此时队列中的样子变成了下面的样子
第二次又一次获取锁又失败了之后再次进到shouldParkAfterFailedAcquire这个方法的时候头节点的waitsStatus就是-1了此时第一个if逻辑就成功了,并且返回了true,之后走到下一个方法里面parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
此时在这个方法中将线程挂起了,其实这个线程调用了await方法的时候就是将自己加入到AQS的等待队列中去了
countDown方法分析
其他线程调用了两次countDown方法之后,之前await方法挂起的线程又能执行了,看看这个方法做了什么
public void countDown() {
sync.releaseShared(1);
}
复制代码
里面调用的是Sync这个同步工具类的releaseShared(1)的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
再看看这个尝试释放共享模式锁的方法tryReleaseShared(arg)
protected boolean tryReleaseShared(int releases) {
for (;;) {
// 获取state,此时state是2
int c = getState();
if (c == 0)
return false;
// 进行减一
int nextc = c-1;
// cas操作设置state的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
经过这一个countDown方法的调用,将state值进行了减一,所以此时state从2变成了1,但是还是返回了false,此时不走doReleaseShared()方法,等到第二次调用countDown方法的时候,此时 return nextc == 0 就是true了,该走 doReleaseShared() 方法了,我们看看这个方法,做了什么
private void doReleaseShared() {
for (;;) {
// 定义了一个head指针指向了AQS队列的head节点
Node h = head;
// 此时h不是空并且也不是最后一个节点,所以条件成立
if (h != null && h != tail) {
// 此时头节点的 waitStatus 是 -1
int ws = h.waitStatus;
// 此 if 逻辑成立
if (ws == Node.SIGNAL) {
// 将头节点的 waitStatus 从-1 变成了 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后续的节点来获取锁
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
此方法我们也分析好多次了,看上面代码的注释,以及下面的图
看看如何唤醒的AQS中等待队列的线程
private void unparkSuccessor(Node node) {// 传进来了一个头节点
// 此时头节点的waitStatus已经被改成过0了
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 定义一个s指针指向了当前头节点的下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 此时s不是空值所以,进行了线程唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
看看目前AQS队列中的样子
分析唤醒之后的逻辑
线程在哪里进行挂起来的,我记得是获取锁的时候被挂起来的我们往前看看
没错了就是这个地方进行的挂起,那么此是唤醒之后我们看看要做什么,进入下一次循环再次获取锁,定义一个p指向了头节点,此时获取成功了因为此时state等于0了,之后返回的1,所以r等于了1,此时进入了 setHeadAndPropagate(node, r) 代码块
private void setHeadAndPropagate(Node node, int propagate) {
// 定义了一个h指针指向了head节点
Node h = head;
// 设置头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
复制代码
设置头节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
复制代码
继续往下看 r = 1所以propagate > 0 成立,后面的if中的逻辑不用看了有一个成立就行了,定义了一个s指针指向了当前node的下一个节点,不过node的下一个节点是个null,所以此时s==null成立了走到了 doReleaseShared() 这个方法中,看看这个方法做了什么
private void doReleaseShared() {
for (;;) {
// 定义了一个h指向了head
Node h = head;
// h不是null,并且h也不是tail所以条件成立
if (h != null && h != tail) {
// 此时h的waitStatus已经被改成了0了
int ws = h.waitStatus;
// 不成立
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 将h的waitStatus 改成-3
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
复制代码
继续看逻辑往下走修改其他的指针指向
p.next = null; // help GC
failed = false;
return;
复制代码
此时这个方法就走完了,也获取到锁了,也继续往下走其自己的逻辑了