写在前面
CyclicBarrier 源码学习AQS
demo
就是让多个线程一起执行,多线程同时到达栅栏点,等到达栅栏点的线程等于count时,所有线程往下执行
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
public void run() {
System.out.println("所有任务执行完成执行的逻辑");
}
});
AtomicInteger num = new AtomicInteger(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
num.decrementAndGet();
System.out.println("index: " + num);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....");
}
index: 9
index: 7
index: 8
index: 6
index: 5
index: 4
index: 3
index: 2
index: 1
index: 0
所有任务执行完成执行的逻辑
全部到达屏障....
构造方法,回调函数可以不用传,调用一个参数的构造方法。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
- 创建CyclicBarrier对象,可以不传回调函数
- 主要是count值,如果此值为0,线程不再阻塞,开始执行
- patties作用是记录,用于可以下一次使用,将此值赋值给count,相当于复位
- 调用cyclicBarrier.await();
cyclicBarrier.await
最终调用到下面的方法,就这一个简单方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//创建cyclicBarrier对象默认创建ReentrantLock
final ReentrantLock lock = this.lock;
//锁住 以下只有一个线程执行 其余线程在clh队列
lock.lock();
try {
//此对象只有一个属性用于判断栅栏是否损坏
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//将count--操作。就是每次await都减一
int index = --count;
//如果等于0,说明都减完了,可以往下执行逻辑了
if (index == 0) {
// tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//执行回调函数
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程 重置属性,为下一次栅栏使用
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
//如果没有等待时间
if (!timed)
//等待被唤醒
trip.await();
//等待被唤醒有时间限制
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
//是否损坏
if (g.broken)
throw new BrokenBarrierException();
//如果generation已经重置 说明已经全都都走完 此时直接返回
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
- 逻辑说明已经在代码中注释
- 先获取到锁,此时只有一个线程能够进来
- 按照demo为例,此时的count为11,每一个线程进来都需要执行 – (减减)操作
- 如果减完后count == 0 说明 规定的线程数都已经到齐,我可以执行后面的业务逻辑代码了
- 如果count == 0 ,执行回调函数,然后唤醒其余10等待的线程,返回0
- 如果count != 0,说明当前线程不是最后一个,线程还没有到齐,当前线程也需要进行阻塞
- 利用ReentrantLock与condition进行等待与唤醒,底层具体实现请参考前面的内容
本类源码较为简单,就是利用ReentrantLock与condition的变相应用
以上就是CyclicBarrier的大致源码分析,如有纰漏,欢迎批评指正。