原创转载请注明出处:http://agilestyle.iteye.com/blog/2343979
CyclicBarrier
CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点(common barrier point),这些线程必须实时地互相等待,这种情况下就可以使用CyclicBarrier来方便地实现这样的功能。另外,CyclicBarrier的公共屏障点可以重用。
CyclicBarrierTest1.java
package org.fool.java.concurrent.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest1 { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("All invoked!!!")); for (int i = 0; i < 5; i++) { Thread thread = new Thread(new MyThread(cyclicBarrier)); thread.start(); } } public static class MyThread implements Runnable { private CyclicBarrier cyclicBarrier; public MyThread(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { Thread.sleep((int) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " invoked..." + System.currentTimeMillis()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
Note:
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("All invoked!!!"));
设置最大为5个parties同行者,也就是5个线程都执行了await()方法后程序才可以继续向下运行,否则这些线程彼此互相等待,一直呈阻塞状态。
线程个数大于parties数量进行分批处理
CyclicBarrierTest2.java
package org.fool.java.concurrent.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest2 { public static void main(String[] args) { try { CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("All invoked!!!")); for (int i = 0; i < 4; i++) { Thread thread = new Thread(new MyThread(cyclicBarrier)); thread.start(); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } public static class MyThread implements Runnable { private CyclicBarrier cyclicBarrier; public MyThread(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " begin=" + System.currentTimeMillis() + " wait 2 threads to continue"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + " end=" + System.currentTimeMillis() + " have 2 threads to continue"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }Run
Note:
从运行结果来看,每出现2个线程就开始RUN
getNumberWaiting()
getNumberWaiting()作用是获得有几个线程已经到达屏障点。
CyclicBarrierTest3.java
package org.fool.java.concurrent.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest3 { public static void main(String[] args) { try { CyclicBarrier cyclicBarrier = new CyclicBarrier(3); for (int i = 0; i < 10; i++) { Thread thread = new Thread(new MyThread(cyclicBarrier)); thread.start(); Thread.sleep(1000); System.out.println(cyclicBarrier.getNumberWaiting()); } } catch (InterruptedException e) { e.printStackTrace(); } } public static class MyThread implements Runnable { private CyclicBarrier cyclicBarrier; public MyThread(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
Run
Note:
从运行结果来看,CyclicBarrier具有屏障重置性,也就是parties的值可以重置为0
使用CyclicBarrier屏障重置性实现多阶段的比赛实验
CyclicBarrierTest4.java
package org.fool.java.concurrent.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest4 { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(2); Service service = new Service(cyclicBarrier); for (int i = 0; i < 4; i++) { Thread thread = new Thread(new MyThread(service)); thread.setName("Thread " + (i + 1)); thread.start(); } } public static class Service { private CyclicBarrier cyclicBarrier; public Service(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } public void beginRun() { try { Thread.sleep((int) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " begin No.1 phase " + (cyclicBarrier.getNumberWaiting() + 1)); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " end No.1 phase " + (cyclicBarrier.getNumberWaiting())); Thread.sleep((int) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " begin No.2 phase " + (cyclicBarrier.getNumberWaiting() + 1)); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis() + " end No.2 phase " + (cyclicBarrier.getNumberWaiting())); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } public static class MyThread implements Runnable { private Service service; public MyThread(Service service) { this.service = service; } @Override public void run() { service.beginRun(); } } }
Run
Note:
CyclicBarrier类的parties值从1到2,然后再恢复到0,证明CyclicBarrier类的屏障点是可以复用的,另外线程1234每到达一个屏障点时的组合是随机的,sleep勇士最少的互相组合,继续向下一个屏障行进。
isBroken()
Reference
Java并发编程核心方法与框架