结合CountDownLatch和CyclicBarrier了解一下AQS的共享锁部分
1、CountDownLatch的使用
先看下CountDownLatch是怎么使用的
public class CountDownLatchTest {
public void CountDownLatchTest() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(5);
ExecutorService e = Executors.newFixedThreadPool(8);
// 创建 N 个任务,提交给线程池来执行
for (int i = 1; i <= 5; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
// 等待所有的任务完成,这个方法才会返回,还没完成就阻塞在这里
doneSignal.await(); // wait for all to finish
System.out.println("所有线程的任务都做完了");
e.shutdown();
System.out.println("线程池已关闭");
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
doWork(i);
// 这个线程的任务完成了,调用 countDown 方法
doneSignal.countDown();
}
void doWork(int i) {
System.out.println("线程" + i + "的任务做完了");
}
}
public static void main(String[] args) throws InterruptedException {
new CountDownLatchTest().CountDownLatchTest();
}
}
输出结果
线程1的任务做完了
线程2的任务做完了
线程4的任务做完了
线程3的任务做完了
线程5的任务做完了
所有线程的任务都做完了
线程池已关闭
从输出结果可以看过,5个线程只有完成了任务await()方法才会返回然后执行后面的,还没完成就会被阻塞,直到5个线程完全任务才不阻塞,实际应用就是把一个大的任务分成几个小任务然后让不同的线程并发执行,例如把一段数据分成多段,每个线程处理一部分数据,直到整段数据处理后才返回
2、CountDownLatch原理(共享锁)
说完CountDownLatch的使用就可以开始了解CountDownLatch的原理了,看下CountDownLatch是怎样的使用AQS的共享锁的
public void Test2() {
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
// 阻塞,等待 state 减为 0
latch.await();
System.out.println("线程 t1 从 await 中返回了");
} catch (InterruptedException e) {
System.out.println("线程 t1 await 被中断");
Thread.currentThread().interrupt();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
// 阻塞,等待 state 减为 0
latch.await();
System.out.println("线程 t2 从 await 中返回了");
} catch (InterruptedException e) {
System.out.println("线程 t2 await 被中断");
Thread.currentThread().interrupt();
}
}
}, "t2");
t1.start();
t2.start();
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException ignore) {
}
// 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown()
latch.countDown();
System.out.println("t3完成了任务");
}
}, "t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException ignore) {
}
// 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown()
latch.countDown();
System.out.println("t4完成了任务");
}
}, "t4");
t3.start();
t4.start();
}
输出结果
t3完成了任务
t4完成了任务
线程 t2 从 await 中返回了
线程 t1 从 await 中返回了
2.1、构造方法
构造方法的作用是初始化state的值,相当于初始化了多少把锁,不允许传入负数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
2.2、await()方法
假设现在线程1,线程2调用了await()方法
首先调用的是CountDownLatch的await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
然后调用AQS的acquireSharedInterruptibly()方法
//因为这个方法是会处理中断的,所以先判断中断状态
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//state不为0条件成立然后调用doAcquireSharedInterruptibly(arg)
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
然后调用CountDownLatch的静态内部类Sync的 tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
//state为0时才返回1
return (getState() == 0) ? 1 : -1;
}
tryAcquireShared返回-1,所以调用AQS的doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//把线程1入队 由于线程1入队之前队列是空的,所以线程1会创建一个head节点,然后再把线程1入队
//因为是共享锁没有线程单独持有锁所以exclusiveOwnerThread这个变量用不上
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//前驱结点是头结点
if (p == head) {
//就可以尝试去获取锁
int r = tryAcquireShared(arg);
//大于等于0表示获取成功
if (r >= 0) {
//把当前节点设为新的头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//获取锁失败就挂起,把前驱节点的ws设为了-1
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//被中断就抛异常,不会继续尝试抢锁
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
线程1、线程2调用await()方法获取锁失败的话会先被挂起等待被唤醒
此时的队列是
await()方法的整体流程:
1、判断state是否为0,不为0就调用AQS的doAcquireSharedInterruptibly()方法
2、把当前线程加入同步队列
3、如果在队列的前驱结点是是头结点可以尝试获取锁
4、获取失败就把前驱节点的ws设为-1然后挂起当前线程
2.3、countDown()方法
每调用一次countDown()方法,state就会减1,相当于解了一把锁,直到state减为0,即表示锁解完了
线程3和线程4分别调用CountDownLatch的countDown()方法
public void countDown() {
sync.releaseShared(1);
}
然后调用AQS的releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
然后调用CountDownLatch静态内部类的tryReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取当前状态值
int c = getState();
//为0表示不用解锁
if (c == 0)
return false;
int nextc = c-1;
//cas把state减1
if (compareAndSetState(c, nextc))
//减完之后state为0就返回true
return nextc == 0;
}
}
如果tryReleaseShared()方法返回true表示锁已经全部解完了,可以调用AQS的doReleaseShared()方法唤醒所有阻塞的线程了
//只有当state的值为0或被中断了才会调用这个方法,线程被唤醒后还继续调用这个方法
private void doReleaseShared() {
for (;;) {
Node h = head;//头结点
//头结点不为空,而且队列不止一个节点
//如果头结点为空表明同步队列没节点
//如果头结点等于尾结点表明队列只有一个节点,队列为空或只有一个节点表明没有节点需要被唤醒了
if (h != null && h != tail) {
int ws = h.waitStatus; //头结点ws
//如果头结点ws的值是SIGNAL-1表示头结点的后继节点要被唤醒
if (ws == Node.SIGNAL) {
//把头结点的ws通过cas设为0,要用cas是因为可能有多个线程修改ws
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒头结点的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//头结点还没改变表明后继节点还没占有头结点
if (h == head) // loop if head changed
break;
}
}
此时头结点的后继节点被唤醒了,即线程1被唤醒了
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//被唤醒后r为1即大于等于所以会调用setHeadAndPropagate(node, r)
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//被唤醒后会先检查中断,然后再继续循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//把线程1设为新的头结点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//会继续唤醒新的头结点的后继节点,即线程2,直到所有的线程被唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
countDown()方法整体流程:
1、调用CountDownLatch的静态内部类Sync的tryReleaseShared()方法把state减1表示释放了一把锁,然后判断释放完之后的state是否为0
2、如果为0就调用AQS的doReleaseShared()唤醒head后面的所有节点
3、head后面的节点被唤醒后会调用setHeadAndPropagate()把当前节点改为新的head节点并继续调用doReleaseShared()方法唤醒新的head节点后面的节点
4、直到队列为空或只有一个节点表示没有节点需要被唤醒,然后向上一步步返回到await()方法,await()方法返回后就可以继续执行后面的逻辑了
总结
1、CountDownLatch使用了AQS的共享锁,共享锁的意思是可以多个线程去进行获取锁和释放锁,而独占锁只能由一个线程进行获取锁和释放锁
2、一个线程调用了await()方法,如果state不为0就会进入同步队列等待唤醒获取锁
3、其他线程调用countDown()方法后,如果state为0就会把同步队列里head后面的节点都唤醒
4、同一个线程不能同时调用await()方法和countDown()方法,因为不能自己唤醒自己