前言
本章用到了之前谈到的AQS,就是在该FIFO阻塞框架的基础上改造的,不理解的,可以去看JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读
用途
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。说得通俗易懂一点,也可以理解成是一个带有计数器功能的锁,当计数器的数值为 0 时就会释放这个锁。
CountDownLatch 和 ReentrantLock 一样,都是基于 AQS 实现的锁。(关于 AQS 和 ReentrantLock,可以参考 ReentrantLock 源码解析)但它们的不同之处在于,ReentrantLock 是独占锁,CountDownLatch 是共享锁。
所谓独占锁就是在同一时间内只有一条线程能够持有这个锁,而共享锁则是在同一时间内可以有一个或多个线程持有这个锁。
主要的就是两个方法,一个是await方法,一个是countDown方法
代码解读
//初始化state变量值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//阻塞方法,获取锁的过程
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
////阻塞方法,获取锁的过程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果该阻塞的线程,某个地方被中断过的话,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取锁代码
if (tryAcquireShared(arg) < 0)
//小于0的时候,代表还有线程处于阻塞状态
doAcquireSharedInterruptibly(arg);
}
//如果当前的count值不等于0,直接返回-1,否则返回1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//该部分代码与AQS的代码原理一样,增加一个node节点,来阻塞当前的线程,如果调用wait多次的话,就会形成一个CLH的阻塞队列节点链表,内部通过自旋的方式来进行判断获取锁的逻辑
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果当前节点是头节点,就再次获取锁
int r = tryAcquireShared(arg);
//如果count被减少为0了,就进行释放操作
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//通过自旋的方式,来释放阻塞节点上的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//对count减1操作
public void countDown() {
sync.releaseShared(1);
}
//释放锁操作
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//如果当前count只为0.则进行释放锁的逻辑
doReleaseShared();
return true;
}
return false;
}
//cas的方式来更新count值
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
总结
1、内部通过一个count值来控制线程之间的状态交互
2、当await的时候,会在CLH中添加一个node节点,并阻塞当前的线程,内部通过自旋的方式来一直判断当前count值,从而来释放当前线程
3、countdown()的时候,会对count值减1,并判断如果为0的话,就会进行释放锁的逻辑,也就是环形CLH队列中各个节点上的线程