版权声明:本文为博主原创文章,转载请注明出处 https://blog.csdn.net/fei20121106/article/details/83268650
文章目录
CountDownLatch是一种java.util.concurrent包下一个同步工具类,它允许一个或多个线程等待直到在其他线程中一组操作执行完成。
相对于前文的锁,它主要实现了: 调用指定次release后,才会释放锁
一、使用
public static void testCountDownLatch(){
int threadCount = 10;
final CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0; i< threadCount; i++){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getId() + "开始出发");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getId() + "已到达终点");
latch.countDown();
}
}).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10个线程已经执行完毕!开始计算排名");
}
二、总体结构
public class CountDownLatch {
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 该方法会使线程进入等待状态,直到计数器减至0,或者线程被中断。当计数器为0时,调用
* 此方法将会立即返回,不会被阻塞住。
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/** 带有超时功能的 await */
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
}
值得注意的是:
通常而言”AQS里的state在重入锁里代表线程重入的次数,state=1代表重入锁当前已被某个线程独占,这个线程每重入一次,state++“,但是在CountDownLatch中:
- state在AQS被实例化时构建为 必须释放的次数
- state>0 表示 还需要被释放
2.1 AQS的实现
我们回忆一下AQS需要重写的钩子方法:
方法名称 | 描述 |
---|---|
boolean tryAcquire(int arg) | 独占式尝试获取同步状态(通过CAS操作设置同步状态),如果成功返回true,反之返回false |
boolean tryRelease(int arg) | 独占式释放同步状态,成功返回true,失败返回false。 |
int tryAcquireShared(int arg) | 共享式的获取同步状态,返回大于等于0的值,表示获取成功,反之失败。 |
boolean tryReleaseShared(int arg) | 共享式释放同步状态,成功返回true,失败返回false。 |
boolean isHeldExclusively() | 判断同步器是否在独占模式下被占用,一般用来表示同步器是否被当前线程占用 |
代码很简单了
tryAcquireShared
必须满足 state==0,才获取到锁tryReleaseShared
实现state自减
public class CountDownLatch {
private final Sync sync;
/** CountDownLatch 的构造方法,该方法要求传入大于0的整型数值作为计数器 */
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化 Sync
this.sync = new Sync(count);
}
/** CountDownLatch 的同步控制器,继承自 AQS */
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
// 设置 AQS state
setState(count);
}
int getCount() {
return getState();
}
/** 尝试在共享状态下获取同步状态,该方法在 AQS 中是抽象方法,这里进行了覆写 */
protected int tryAcquireShared(int acquires) {
/*
* 如果 state = 0,则返回1,表明可获取同步状态,
* 此时线程调用 await 方法时就不会被阻塞。
*/
return (getState() == 0) ? 1 : -1;
}
/** 尝试在共享状态下释放同步状态,该方法在 AQS 中也是抽象方法 */
protected boolean tryReleaseShared(int releases) {
/*
* 下面的逻辑是将 state--,state 减至0时,调用 await 等待的线程会被唤醒。
* 这里使用循环 + CAS,表明会存在竞争的情况,也就是多个线程可能会同时调用
* countDown 方法。在 state 不为0的情况下,线程调用 countDown 是必须要完
* 成 state-- 这个操作。所以这里使用了循环 + CAS,确保 countDown 方法可正
* 常运行。
*/
for (;;) {
// 获取 state
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
// 使用 CAS 设置新的 state 值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}