一、CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
CyclicBarrier
循环栅栏,Cyclic
意味循环,也就是这个计数器可以反复使用。
CyclicBarrier
支持一个可选的 Runnable
命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。
CyclicBarrier
适用于这样的情况:创建一组任务,并行执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成(看起来有些像join()
)。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动,这非常像CountDownLatch
,只是CountDownLatch
只触发一次的时间,而 CyclicBarrier
可以多次重用
1.1 API
API | 描述 |
---|---|
CyclicBarrier(int parties) |
创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。 |
CyclicBarrier(int parties, Runnable barrierAction) |
创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。 |
int await() |
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。 |
int getNumberWaiting() |
返回当前在屏障处等待的参与者数目。 |
void reset() |
将屏障重置为其初始状态 |
示例用法:下面是一个在并行分解设计中使用 barrier 的例子(伪代码):
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
在这个例子中,每个
worker
线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么done()
将返回 true,所有的worker
线程都将终止。
1.2 案例分析
案例一:
CyclicBarrier
使得每匹马都执行为了向前移动所必需执行的所有工作,然后必须在栅栏处等待其他所有的马都准备完毕。 当所有的马都向前移动时,CyclicBarrier
将自动调用Runnable
栅栏动作任务,按顺序显示马和终点线的位置。
一旦所有的任务都越过栅栏,它就会自动地为下一回合比赛做好准备。(可以拷贝运行查看运行结果)
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++; //定义ID值
private int strides = 0;
private static Random random = new Random(47);
private static CyclicBarrier cyclicBarrier;
public Horse(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public synchronized int getStrides() {
return strides;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) { //使用this,表示当前对象
strides += random.nextInt(3);//随机获取一个值。表示长度值
}
cyclicBarrier.await(); //一直等待,当计数值达到0值时执行一次Runnable中的 run()
}
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
throw new RuntimeException();
}
}
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
stringBuilder.append("*");
}
stringBuilder.append(id);
return stringBuilder.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75; //完成的步长
private List<Horse> horses = new ArrayList<>();
private ExecutorService executorService = Executors.newCachedThreadPool();
private CyclicBarrier cyclicBarrier;
//在构造函数
public HorseRace(int nHorses, final int pause) {
// 在构造函数中给CyclicBarrier赋值,
//在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次
cyclicBarrier = new CyclicBarrier(nHorses, new Runnable() {
@Override
public void run() {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
stringBuilder.append("=");
}
System.out.println(stringBuilder);
for (Horse horse : horses) {
System.out.println(horse.tracks());
}
for (Horse horse : horses) {
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + " won");
executorService.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interruptedException");
}
}
});
// 启动比赛
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(cyclicBarrier);
horses.add(horse);
executorService.execute(horse); //启动线程
}
}
public static void main(String[] args) {
int nHorse = 7;
int pause = 200;
new HorseRace(nHorse, pause);//第一个参数设置马的数量,第二个参数设置暂停时间
}
}
可以向CyclicBarrier
提供一个动作,它是一个Runnable
,当计数值达到0时,自动执行,这是CyclicBarrier
和CountDownLatch
之间的另外一个区别。;
案例2
其他案例情况。
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
运行结果
线程pool-1-thread-2即将到达集合地点1,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合地点1,当前已有2个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点1,当前已有3个已经到达,都到齐了,继续走啊
线程pool-1-thread-3即将到达集合地点2,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点2,当前已有2个已经到达,正在等候
线程pool-1-thread-1即将到达集合地点2,当前已有3个已经到达,都到齐了,继续走啊
线程pool-1-thread-3即将到达集合地点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点3,当前已有2个已经到达,正在等候
线程pool-1-thread-1即将到达集合地点3,当前已有3个已经到达,都到齐了,继续走啊
1.2.1 CyclicBarrier 异常问题
Cyclicbarrier.await()
方法可能会抛出两个异常,一个是InterruptedException
,也就是等待过程中,线程被中断,这是很常见的异常。 大部分迫使线程等待的方法都可能会抛出这个异常,使得线程在等待时依然可以响应外部紧急事件。
另外一个异常则是CyclicBarrier
特有的BrokenbarrierException
。一旦遇到这个异常,则表示当前的CyclicBarrier已经破损,可能系统已经没有办法等待所有线程都到齐了,所以就撤销所有线程。
参考
- 张孝祥-Java多线程与并发库高级应用
- 《java编程思想》
- 《实战Java高并发程序设计》