示例代码
先贴一段示例代码
public class TestCountDownLatch {
public static CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
for (int i=0;i<3;i++){
new Thread(new Task()).start();
}
System.out.println("线程启动结束,主线程进入等待状态");
countDownLatch.await();
System.out.println("主线程结束");
}
}
class Task implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+" task ");
TestCountDownLatch.countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
主线程会在countDownLatch.await()
这里阻塞,三个Task线程执行完各自的TestCountDownLatch.countDownLatch.countDown()
后,主线程继续向下执行。
这个是主线程阻塞等待子线程的例子,当前这个阻塞等待并不拘泥于主线程,可以让任意一个线程进行await
,当CountDownLatch
计数器归零时线程才会继续
原理
new CountDownLatch(3)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
可以看到CountDownLatch是通过一个继承AQS的内部锁实现的,构造器设定了锁的状态值。
await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 主线程调用await时会尝试去获取share锁,如果此时
state!=0
,就会进入doAcquireSharedInterruptibly
中,这个方法如果看过AQS的实现不会陌生,就是AQS中构造同步队列节点的方法。 - AQS将当前线程(主线程)构造成一个节点(如果是第一个节点则会构建一个空的头节点),然后主线程会自旋再去尝试获得一次share锁
- 还是没有获取到,这时候将节点阻塞
countDown
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 工作线程每次调用
countDown
,都会调用releaseShared(1);
,即释放了一把共享锁。该方法会CAS更改state
值,只有当state
减到0时才会返回true
,即示例代码中的前两个线程调用该方法只会让state
自减1,当第三个线程将state
减为0了才会发生下面的动作 - 调用
doReleaseShared
,将头节点的WaitStatus
更改后进入unparkSuccessor(h)
,这个函数会删掉所有取消获取锁的线程节点,同时调用LockSupport.unpark
方法将头节点的下个节点(主线程)唤醒 - 主线程唤醒后又回到了
doAcquireSharedInterruptibly
中的for(;;)
自旋中,这时候由于state=0
因此是可以获取到共享锁的,然后会进入到if
条件中将自己设置为头节点,并尝试唤醒后面的节点 - 这时候主线程会返回,也就是从
countDownLatch.await();
返回可以继续往下执行了
总结
总结下来就是,创建CountDownLatch
时会让其自己持有数量n
的共享锁,每次countDown
就是在释放这个共享锁,await
的线程要等到这个共享锁完全被释放了才会返回