目录页:https://blog.csdn.net/u011294519/article/details/88367808
1.CountDownLatch
作用:一个线程等待其他线程执行完成后再执行,算是一个加版的join,
运用场景:说个我理解的运用场景,用户购买商品,修改订单状态为已支付作为一个等待线程,需要等待用户支付完成,商品库存扣除无异常这两个线程完成才会去修改订单状态(当然实际情况会在用户下单时先把库存锁死等待用户支付)
上代码:
package com.concurrent.coline.part5.tools;
import java.util.concurrent.CountDownLatch;
/** * 类说明:演示CountDownLatch,有5个初始化的线程,6个扣除点, * 扣除完毕以后,主线程和业务线程才能继续自己的工作 */ public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
//初始化线程(只有一步,有4个) private static class InitThread implements Runnable {
@Override public void run() { System.out.println(Thread.currentThread().getName() + " do business"); latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次; System.out.println(Thread.currentThread().getName() + " countDown finish");
} }
//业务线程 private static class BusiThread implements Runnable {
@Override public void run() { try { System.out.println(Thread.currentThread().getName() + " begin await"); latch.await(); System.out.println(Thread.currentThread().getName() + " do business"); } catch (InterruptedException e) { e.printStackTrace(); } } }
public static void main(String[] args) throws InterruptedException { //单独的初始化线程,初始化分为2步,需要扣减两次 new Thread("countDown_twice_thread") { @Override public void run() { System.out.println(Thread.currentThread().getName() + " first countDown"); latch.countDown();//每完成一步初始化工作,扣减一次 System.out.println(Thread.currentThread().getName() + " second countDown"); latch.countDown();//每完成一步初始化工作,扣减一次 } }.start();
new Thread(new BusiThread(), "await_business_thread").start();
for (int i = 0; i <= 3; i++) { Thread thread = new Thread(new InitThread()); thread.start(); } } } |
运行结果:
代码位置:concurrent-toolbar的part5
查看CountDownLatch的源代码,在构造方法中会初始化AbstractQueuedSynchronizer(后面简称AQS)类中的计数值state,这个参数作为计数器存在,控制阻塞。需要注意的是该值由volatile关键字修饰,具有可见性但是不保证其原子性,这是我在刚开始看到这个代码时存在疑惑的地方,但是在看到countDown方法实现state减一的操作时,发现其使用了AQS的compareAndSetState方法来保证数值减一的原子性。
CountDownLatch的await方法就是在自旋中调用AOS的tryAcquireShared方法一直判断state是否等于0,若state等于0则结束自旋。当然其还涉及了链表的数据结构。
2.CyclicBarrier
让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程继续运行。
上代码:
package com.concurrent.coline.part5.tools;
import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier;
/** * 类说明:CyclicBarrier的使用 */ public class UseCyclicBarrier {
private static CyclicBarrier barrier = new CyclicBarrier(5, new CollectThread());
//存放子线程工作结果的容器 private static ConcurrentHashMap<String, String> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) { for (int i = 0; i <= 4; i++) { Thread thread = new Thread(new SubThread()); thread.start(); }
}
//负责屏障开放以后的工作 private static class CollectThread implements Runnable {
@Override public void run() { StringBuilder result = new StringBuilder(); for (Map.Entry<String, String> workResult : resultMap.entrySet()) { result.append("[" + workResult.getValue() + "]"); } System.out.println("the result = " + result); System.out.println("do other business........"); } }
//工作线程 private static class SubThread implements Runnable {
@Override public void run() { //线程本身的处理结果 String threadName = Thread.currentThread().getName(); resultMap.put(Thread.currentThread().getId() + "", threadName); Random r = new Random();//随机决定工作线程的是否睡眠 try { if (r.nextBoolean()) { Thread.sleep(2000); System.out.println(threadName + " spend time on other thing "); } System.out.println(threadName + "begin await"); barrier.await(); Thread.sleep(1000); System.out.println("await finsih "+threadName + " finish business "); } catch (Exception e) { e.printStackTrace(); }
} } } |
运行结果:
代码位置:concurrent-toolbar中的part5
CyclicBarrier有两个构造方法
- public CyclicBarrier(int parties, Runnable barrierAction)
翻看源码注释,该方法用于创建一个CyclicBarrier,parties是初始化在执行barrierAction之前需要达到的线程数量,barrierAction是阻塞线程数量达到要求时执行的内容。
- public CyclicBarrier(int parties)
翻看源码注释,该方法用于创建一个CyclicBarrier,parties是初始化阻塞的线程数量,在阻塞线程数到达parties时并不会做任何另外的操作。
不同于CountDownLatch,CyclicBarrier的计数由他自己控制,在构造方法初始化后,每调用一次await()方法就减一,那么CyclicBarrier又是如何控制并发的呢,查看源码发现,在对count减一时,通过ReentrantLock锁保证并发安全。同时线程等待是使用链表的数据结构来实现(写的有点绕)。
那么问题来了,CountDownLatch和CyclicBarrier的区别在哪里,貌似都是阻塞并等待,我是这么理解的,CountDownLatch是一个线程是否执行依赖于其他线程的countDown是否达到数量,是一种依赖关系。CyclicBarrier是一群线程作为一个线程组,类似于多个线程作为一个线程组,共同完成一个业务逻辑,就像小时候和妈妈一起拼拼图,是一种合作关系,但是CyclicBarrier提供了public CyclicBarrier(int parties, Runnable barrierAction)构造函数,barrierAction线程是依赖于线程组执行完成,类似于CountDownLatch的逻辑。(个人见解,说的不对的请指正)