源码分析
1 package java.util.concurrent; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.ReentrantLock; 5 6 /* 7 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点,之所以称为cyclic的barrier,是因为等待线程释放后栅栏可以重用 8 9 对于失败的同步尝试,CyclicBarrier用了一种all-or-none破坏模式: 10 如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在屏障点等待的其他所有线程也会通过BrokenBarrierException异常离开屏障点, 11 如果几乎同时被中断,那么会通过InterruptedException以反常的方式离开 12 13 实现思路:就是设置一个计数,每当有线程达到时,计数count-1,Condition.await进入阻塞,当count=0,那么可以signalAll,让所有线程得以唤醒,唤醒后立即重置 14 */ 15 public class CyclicBarrier { 16 /* 17 屏障的每次use都意味着一次generation instance。当屏障重置时,generation会发生改变,在用屏障的过程中,可能有许多代与线程相关联, 18 这是因为锁分配给等待线程的方式的不确定性,但是在同一个时间只能有一个generation处于活动状态,而其他所有的都会被破坏 19 */ 20 private static class Generation { 21 boolean broken = false; 22 } 23 24 // 守护屏障入口的锁 25 private final ReentrantLock lock = new ReentrantLock(); 26 // Condition即条件谓词 27 private final Condition trip = lock.newCondition(); 28 // 屏障释放前需要等待的线程数量 29 private final int parties; 30 // 屏障突破后,要执行的命令 31 private final Runnable barrierCommand; 32 // 轮,相当于一次集合到释放为一轮,一轮一轮的进行 33 private Generation generation = new Generation(); 34 35 // 重置后count=parties,count表示还需要等待的线程数量才能结束当前轮进入下一轮 36 private int count; 37 38 // 更新屏障的状态并唤醒所有等待的线程,只有持有锁时才会被调用 39 private void nextGeneration() { 40 // signal completion of last generation 41 trip.signalAll(); 42 // set up next generation 43 count = parties; 44 generation = new Generation(); 45 } 46 47 // 设置当前屏障的generation并唤醒所有等待的线程,只有持有锁时才会被调用 48 private void breakBarrier() { 49 generation.broken = true; 50 count = parties; 51 trip.signalAll(); 52 } 53 54 // timed:是否有时间限制 nanos:wait的纳秒数 55 private int dowait(boolean timed, long nanos) 56 throws InterruptedException, BrokenBarrierException, 57 TimeoutException { 58 final ReentrantLock lock = this.lock; 59 lock.lock(); 60 try { 61 final Generation g = generation; 62 63 if (g.broken) 64 throw new BrokenBarrierException(); 65 66 if (Thread.interrupted()) { 67 breakBarrier(); 68 throw new InterruptedException(); 69 } 70 71 int index = --count; 72 if (index == 0) { // tripped 73 boolean ranAction = false; 74 try { 75 final Runnable command = barrierCommand; 76 if (command != null) 77 command.run(); 78 ranAction = true; 79 nextGeneration(); 80 return 0; 81 } finally { 82 if (!ranAction) 83 breakBarrier(); 84 } 85 } 86 87 // loop until tripped, broken, interrupted, or timed out 88 for (; ; ) { 89 try { 90 if (!timed) 91 trip.await(); 92 else if (nanos > 0L) 93 nanos = trip.awaitNanos(nanos); 94 } catch (InterruptedException ie) { 95 if (g == generation && !g.broken) { 96 breakBarrier(); 97 throw ie; 98 } else { 99 // We're about to finish waiting even if we had not 100 // been interrupted, so this interrupt is deemed to 101 // "belong" to subsequent execution. 102 Thread.currentThread().interrupt(); 103 } 104 } 105 106 if (g.broken) 107 throw new BrokenBarrierException(); 108 109 if (g != generation) 110 return index; 111 112 if (timed && nanos <= 0L) { 113 breakBarrier(); 114 throw new TimeoutException(); 115 } 116 } 117 } finally { 118 lock.unlock(); 119 } 120 } 121 122 // 创建一个参数为parties带有barrierAction的CyclicBarrier 123 public CyclicBarrier(int parties, Runnable barrierAction) { 124 if (parties <= 0) throw new IllegalArgumentException(); 125 this.parties = parties; 126 this.count = parties; 127 this.barrierCommand = barrierAction; 128 } 129 130 // 创建一个参数为parties的CyclicBarrier 131 public CyclicBarrier(int parties) { 132 this(parties, null); 133 } 134 135 // 返回barrier拦截的线程数量 136 public int getParties() { 137 return parties; 138 } 139 140 /* 141 在所有的参与者线程调用await方法之前,屏障一直阻塞已经await的线程,如果当前线程不是最后一个线程,当前线程会处于休眠状态直到: 142 1.最后一个线程到达 143 2.其他某个线程中断当前线程 144 3.其他某个线程中断另一个等待线程 145 4.其他某个线程在等待barrier时超时 146 5.其他某个线程在barrier上调用reset 147 如果当前线程在进入await方法时已经设置了中断状态,或者在等待时被中断,则抛出InterruptedException异常,并清除当前线程的已中断状态 148 149 如果在线程处于等待状态时barrier被reset(),或者在调用await时barrier被损坏,或任意一个线程正处于等待状态,则抛出BrokenBarrierException异常 150 151 如果任何线程在等待时被中断,则其他所有等待线程都会抛出BrokenBarrierException异常,且barrier会被置为损坏状态 152 如果当前线程是最后一个到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程 153 会运行barrier action 154 如果在执行屏障操作过程中发生异常,则异常会传播到当前线程中,且barrier会被置为损坏状态 155 156 await方法返回的是到达的当前线程的索引,其中getParties()-1表示到达的第一个线程,0表示到达的最后一个线程 157 如果当前线程在等待时被中断则抛出InterruptedException 158 如果另一个线程在当前线程等待时被中断或超时,或者重置了barrier,或者在调用await时barrier被损坏,或由于异常而导致屏障操作(如果存在)失败则抛出BrokenBarrierException 159 */ 160 public int await() throws InterruptedException, BrokenBarrierException { 161 try { 162 return dowait(false, 0L); 163 } catch (TimeoutException toe) { 164 throw new Error(toe); // cannot happen 165 } 166 } 167 168 // 带有限制时间的await方法 169 public int await(long timeout, TimeUnit unit) 170 throws InterruptedException, 171 BrokenBarrierException, 172 TimeoutException { 173 return dowait(true, unit.toNanos(timeout)); 174 } 175 176 // 查询屏障是否处于损坏状态 177 public boolean isBroken() { 178 final ReentrantLock lock = this.lock; 179 lock.lock(); 180 try { 181 return generation.broken; 182 } finally { 183 lock.unlock(); 184 } 185 } 186 187 // 重置屏障为初始状态 188 public void reset() { 189 final ReentrantLock lock = this.lock; 190 lock.lock(); 191 try { 192 breakBarrier(); // break the current generation 193 nextGeneration(); // start a new generation 194 } finally { 195 lock.unlock(); 196 } 197 } 198 199 // 返回当前在屏障处等待的线程数量 200 public int getNumberWaiting() { 201 final ReentrantLock lock = this.lock; 202 lock.lock(); 203 try { 204 return parties - count; 205 } finally { 206 lock.unlock(); 207 } 208 } 209 }
典型用法
1 package jcip; 2 3 import java.util.concurrent.BrokenBarrierException; 4 import java.util.concurrent.CyclicBarrier; 5 6 /* 7 循环栅栏:让一组线程到达一个屏障(也叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行 8 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier自己已经到达了屏障,然后当前线程被阻塞 9 CyclicBarrier还提供了一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction所以输出为3 1 2 true或者3 2 1 true 10 */ 11 public class CyclicBarrierTest { 12 // 如果改为new CyclicBarrier(3);则主线程和子线程会永远等待,因为没有第3个线程执行await()方法,即没有第3个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行 13 static CyclicBarrier c = new CyclicBarrier(2); 14 static CyclicBarrier c2 = new CyclicBarrier(2, new Runnable() { 15 @Override 16 public void run() { 17 System.out.print(3 + " "); 18 } 19 }); 20 static CyclicBarrier c3 = new CyclicBarrier(2); 21 22 public static void main(String[] args) throws Exception { 23 new Thread(new Runnable() { 24 @Override 25 public void run() { 26 try { 27 c.await();// c2.await(); 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } catch (BrokenBarrierException e) { 31 e.printStackTrace(); 32 } 33 System.out.print(1 + " "); 34 } 35 }).start(); 36 c.await();// c2.await(); 37 System.out.print(2 + " "); 38 39 Thread thread = new Thread(new Runnable() { 40 @Override 41 public void run() { 42 try { 43 c3.await(); 44 } catch (Exception e) { 45 } 46 } 47 }); 48 thread.start(); 49 thread.interrupt(); 50 try { 51 c3.await(); 52 } catch (Exception e) { 53 System.out.println(c3.isBroken()); 54 } 55 } 56 57 } 58 /* 59 1 2 true或者2 1 true 60 */
CyclicBarrier和CountDownLatch的区别
CountDownLatch | CyclicBarrier |
减计数方式 | 加计数方式 |
计数为0时释放所有等待的线程 | 计数达到指定值时释放所有等待的线程 |
计数为0时,无法重置(一次性的) | 计数达到指定值时,计数置为0重新开始(可重复) |
调用countDown()方法计数减1, 调用await()方法只进行阻塞,对计数没任何影响 |
调用await()方法计数加1, 若加1后的值不等于构造方法的值,则线程阻塞 |