写在前面
CountDownLatch源码学习AQS
demo
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(()->{
try {
System.out.println("线程1执行");
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
new Thread(()->{
try {
System.out.println("线程2执行");
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
countDownLatch.await();
System.out.println("两个线程都执行完执行此段代码");
}
- 主线程、线程1、线程2开始并行执行
- 主线程遇到countDownLatch.await()等待
- 经过2秒线程1执行完成,执行countDownLatch.countDown()
- 经过5秒线程2执行完成,执行countDownLatch.countDown()
- 经过2次countDown,主线程await阻塞通过
- 次数取决于构造方法的数值
源码分析
构造方法
设置state值为2,没有其余逻辑
countDownLatch.await()
最终调用
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- 获取state值,如果等于0,返回1,不等于0,返回-1。
- 我们知道如果state等于0我们才能继续往下走,按照demo为例,最开始state值为2,不等于0(线程1与线程2都没有执行countDown),此时会返回-1
继续走 doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
是不是有种似曾相识的感觉,这就是aqs中的方法,前面已经讲过这个方法的逻辑。就是将主线程加入到clh队列中,设置为共享状态,然后继续判断是不是head的下一个节点,尝试获取共享锁。获取成功返回。获取失败,阻塞住当前线程
- tryAcquireShared 跟前面不同,是countdownlatch的子类实现,只需要判断state是否为0,只有state为0线程才会继续往下走
- 我们记住,现在主线程队列阻塞在这里
- 队列成员 head - > 主线程 只有一个主线程阻塞在这里
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
countDownLatch.countDown()
最终调用,还是aqs的方法
public final boolean releaseShared(int arg) {
//尝试释放锁
if (tryReleaseShared(arg)) {
//释放成功后唤醒线程
doReleaseShared();
return true;
}
return false;
}
CountDownLatch子类的tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
- 死循环加cas就是考虑到多线程问题
- 先获取状态,按照demo中举例,state是2
- 如果state等于0,说明锁已经释放了,不需要再次释放
- 利用cas将2修改为1,如果多线程执行情况下,失败的线程执行下一次循环,将1置为0
- 最后将状态置为0的线程返回true
doReleaseShared
还是aqs方法,唤醒共享锁,请参考Semaphore源码分析,唤醒后主线程阻塞后开始放行,执行下一次循环后获取到锁,因为state已经为0,随后执行业务代码
以上就是CountDownLatch的大致源码分析,如有纰漏,欢迎批评指正。