Java并发编程之CyclicBarrier的使用
先看javadoc对这个类的说明:A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
什么时候用这个类?当有多个线程完成类似的任务,如果需要每个线程在执行任务中的某个地方停下来,等待其它线程也到达相同的等待点(其实不同的等待点也行),那这个类就完美解决了这个需求,比如(后面有代码):有多个跑步的人,各自跑的速度不一样,但是大家需要在中途某个点聚一聚,然后在一起出发。
以下是CyclicBarrier的public方法:
关于两个参数的构造方法:
也就是说如果用了这个方法,那么当等待在await()方法上的线程数 = getParties() 时,该runnable会被最后一个等待的线程去跑。
关于reset()方法:
调用该方法时,若有线程等待在await方法上,则这些线程都会收到一个BrokenBarrierException异常。
关于isBroken()方法:
只要任何一个等待在await()方法上的线程非正常地从await()上返回,那么isBroken()就返回true。
示例:
说明:两个人AAA、BBB,分别以1m/s、2m/s速度跑,若跑到6m处时还有人未到达6m处,则等待所有人都到达时才出发。跑完10m结束
package cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class RunningMan implements Runnable{ private int speed; // 1 m/s private int waitAfterMeters; // m private int totalMeters; // m private String name; private CyclicBarrier barrier; public RunningMan(int speed, int waitAfterMeters, int totalMeters) { this.speed = speed; this.waitAfterMeters = waitAfterMeters; this.totalMeters = totalMeters; } public RunningMan barrier(CyclicBarrier barrier) { this.barrier = barrier; return this; } public RunningMan name(String name) { this.name = name; return this; } private boolean needToWait() { return barrier != null; } private long time() { return System.currentTimeMillis() / 1000; } @Override public void run() { System.out.println(time() + ": runningman " + name + " start running."); int ranMiles = 0;// 跑了多少m while (ranMiles < totalMeters) { // 跑1s try { Thread.sleep(1000 * 1); } catch (InterruptedException e) { // 当前线程被中断了,那就不跑了 Thread.currentThread().interrupt(); break; } //计数 ranMiles += speed; //是否到达指定等待点 if (ranMiles == waitAfterMeters) { if (!needToWait()) { //没有barrier continue; } // 是否是最后一个到达指定等待点的 if (barrier.getNumberWaiting() + 1 == barrier.getParties()) { System.out.println(time() + ": runningman " + name + " is the last one reached at " + waitAfterMeters); } else { System.out.println(time() + ": runningman " + name + " reached at " + waitAfterMeters + ", waiting for others"); } // 等待, 直到barrier.getNumberWaiting() == barrier.getParties()时返回 try { barrier.await(); } catch (InterruptedException e) { // 当前线程被中断了,那就不跑了 Thread.currentThread().interrupt(); break; } catch (BrokenBarrierException e) { System.err.println("someone reset the barrier state."); break; } } else { System.out.println(time() + ": runningman " + name + " reached at " + ranMiles); } } System.out.println(time() + ": runningman " + name + " done running."); } public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(2); RunningMan rm1 = new RunningMan(1, 6, 10).name("AAA").barrier(barrier); RunningMan rm2 = new RunningMan(2, 6, 10).name("BBB").barrier(barrier); Thread t1 = new Thread(rm1); Thread t2 = new Thread(rm2); t1.start(); t2.start(); } }
结果: