从CyclicBarrier到AQS

写在前面

CyclicBarrier 源码学习AQS

demo

就是让多个线程一起执行,多线程同时到达栅栏点,等到达栅栏点的线程等于count时,所有线程往下执行

public static void main(String[] args) throws Exception {
    
    
   CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
    
    
       public void run() {
    
    
           System.out.println("所有任务执行完成执行的逻辑");
       }
   });
   AtomicInteger num = new AtomicInteger(10);
   for (int i = 0; i < 10; i++) {
    
    
       new Thread(()->{
    
    
           num.decrementAndGet();
           System.out.println("index: " + num);
           try {
    
    
               cyclicBarrier.await();
           } catch (InterruptedException e) {
    
    
               e.printStackTrace();
           } catch (BrokenBarrierException e) {
    
    
               e.printStackTrace();
           }

       }).start();
   }
   cyclicBarrier.await();
   System.out.println("全部到达屏障....");
}
index: 9
index: 7
index: 8
index: 6
index: 5
index: 4
index: 3
index: 2
index: 1
index: 0
所有任务执行完成执行的逻辑
全部到达屏障....

构造方法,回调函数可以不用传,调用一个参数的构造方法。

public CyclicBarrier(int parties, Runnable barrierAction) {
    
    
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
  • 创建CyclicBarrier对象,可以不传回调函数
  • 主要是count值,如果此值为0,线程不再阻塞,开始执行
  • patties作用是记录,用于可以下一次使用,将此值赋值给count,相当于复位
  • 调用cyclicBarrier.await();

cyclicBarrier.await

最终调用到下面的方法,就这一个简单方法

private int dowait(boolean timed, long nanos)
   throws InterruptedException, BrokenBarrierException,
          TimeoutException {
    
    
   //创建cyclicBarrier对象默认创建ReentrantLock
   final ReentrantLock lock = this.lock;
   //锁住 以下只有一个线程执行 其余线程在clh队列
   lock.lock();
   try {
    
    
       //此对象只有一个属性用于判断栅栏是否损坏
       final Generation g = generation;
       if (g.broken)
           throw new BrokenBarrierException();
       if (Thread.interrupted()) {
    
    
           breakBarrier();
           throw new InterruptedException();
       }
       //将count--操作。就是每次await都减一
       int index = --count;
       //如果等于0,说明都减完了,可以往下执行逻辑了
       if (index == 0) {
    
      // tripped
           boolean ranAction = false;
           try {
    
    
               final Runnable command = barrierCommand;
               //执行回调函数
               if (command != null)
                   command.run();
               ranAction = true;
               //唤醒所有等待线程 重置属性,为下一次栅栏使用
               nextGeneration();
               return 0;
           } finally {
    
    
               if (!ranAction)
                   breakBarrier();
           }
       }
       for (;;) {
    
    
           try {
    
    
               //如果没有等待时间
               if (!timed)
                   //等待被唤醒
                   trip.await();
                   //等待被唤醒有时间限制
               else if (nanos > 0L)
                   nanos = trip.awaitNanos(nanos);
           } catch (InterruptedException ie) {
    
    
               if (g == generation && ! g.broken) {
    
    
                   breakBarrier();
                   throw ie;
               } else {
    
    
                   Thread.currentThread().interrupt();
               }
           }
           //是否损坏
           if (g.broken)
               throw new BrokenBarrierException();
           //如果generation已经重置 说明已经全都都走完 此时直接返回
           if (g != generation)
               return index;
           if (timed && nanos <= 0L) {
    
    
               breakBarrier();
               throw new TimeoutException();
           }
       }
   } finally {
    
    
       lock.unlock();
   }
}
  • 逻辑说明已经在代码中注释
  • 先获取到锁,此时只有一个线程能够进来
  • 按照demo为例,此时的count为11,每一个线程进来都需要执行 – (减减)操作
  • 如果减完后count == 0 说明 规定的线程数都已经到齐,我可以执行后面的业务逻辑代码了
  • 如果count == 0 ,执行回调函数,然后唤醒其余10等待的线程,返回0
  • 如果count != 0,说明当前线程不是最后一个,线程还没有到齐,当前线程也需要进行阻塞
  • 利用ReentrantLock与condition进行等待与唤醒,底层具体实现请参考前面的内容

本类源码较为简单,就是利用ReentrantLock与condition的变相应用

以上就是CyclicBarrier的大致源码分析,如有纰漏,欢迎批评指正。

猜你喜欢

转载自blog.csdn.net/qq_37904966/article/details/113481996