简介
- CyclicBarrier(可重用屏障/栅栏) 类似于 CountDownLatch(倒计数闭锁),它能阻塞一组线程直到某个事件的发生。
- 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
- 闭锁用于等待事件,而屏障用于等待其他线程。
- CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用
await()
方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
- CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
- 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
- CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
- 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
- 这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。
- CyclicBarrier允许一组线程相互等待,直到到达某个公共的屏障点,通过CyclicBarrier,可以实现多个线程间相互等待,直到所有的线程都准备好,等待条件可以重用,又称为循环屏障,可以用于多线程计算数据,最终汇总计算结果的场景。
应用场景
- CyclicBarrier 常用于多线程分组计算。
- 比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。
CountDownLatch和CyclicBarrier区别
- CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
- CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用
reset()
方法重置;CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。 - CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。
CyclicBarrier 原理
- CyclicBarrier 内部使用了 ReentrantLock 和 Condition 两个类。
案例一
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
// 输出
21:36:05.124 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 is ready
21:36:06.123 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 is ready
21:36:07.123 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 is ready
21:36:08.123 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 is ready
21:36:09.124 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 is ready
21:36:09.124 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 continue
21:36:09.124 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 continue
21:36:09.124 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 continue
21:36:09.124 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 continue
21:36:09.124 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 continue
21:36:10.124 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 is ready
21:36:11.125 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 is ready
21:36:12.125 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 is ready
21:36:13.126 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 is ready
21:36:14.127 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 is ready
21:36:14.127 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 continue
21:36:14.127 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 continue
21:36:14.127 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 continue
21:36:14.127 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 continue
21:36:14.127 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 continue
Process finished with exit code 0
分析 await 方法
- 在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。
- 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
- 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
- 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他线程调用
CyclicBarrier.reset()
方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 线程调用
await()
表示自己已经到达栅栏。 - BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
await()
时被中断或者超时。
案例二
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Exception", e);
}
log.info("{} continue", threadNum);
}
}
// 输出
21:43:03.378 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 is ready
21:43:04.376 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 is ready
21:43:05.376 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 is ready
21:43:05.382 [pool-1-thread-1] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:05.382 [pool-1-thread-2] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:05.382 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 continue
21:43:05.382 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:05.382 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 continue
21:43:05.382 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 continue
21:43:06.377 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 is ready
21:43:06.377 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:06.377 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 continue
21:43:07.377 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 4 is ready
21:43:07.377 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:07.377 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 4 continue
21:43:08.378 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 5 is ready
21:43:08.378 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:08.378 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 5 continue
21:43:09.378 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 6 is ready
21:43:09.378 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:09.378 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 6 continue
21:43:10.379 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 7 is ready
21:43:10.379 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:10.379 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 7 continue
21:43:11.380 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 8 is ready
21:43:11.380 [pool-1-thread-3] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:11.380 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 8 continue
21:43:12.380 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 9 is ready
21:43:12.380 [pool-1-thread-4] WARN c.m.concurrency.example.aqs.test9 - Exception
21:43:12.380 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 9 continue
Process finished with exit code 0
分析 await(timeout,TimeUnit) 方法
- 在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。
- 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
- 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
- 当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
- 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他线程调用
CyclicBarrier.reset()
方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
案例三
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
// 输出
21:47:26.683 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 is ready
21:47:27.682 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 is ready
21:47:28.684 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 is ready
21:47:29.683 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 is ready
21:47:30.683 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 is ready
21:47:30.683 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - callback is running
21:47:30.683 [pool-1-thread-5] INFO c.m.concurrency.example.aqs.test9 - 4 continue
21:47:30.683 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 0 continue
21:47:30.683 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 1 continue
21:47:30.683 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 2 continue
21:47:30.683 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 3 continue
21:47:31.684 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 is ready
21:47:32.684 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 is ready
21:47:33.685 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 is ready
21:47:34.685 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 is ready
21:47:35.685 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 is ready
21:47:35.685 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - callback is running
21:47:35.685 [pool-1-thread-1] INFO c.m.concurrency.example.aqs.test9 - 9 continue
21:47:35.686 [pool-1-thread-6] INFO c.m.concurrency.example.aqs.test9 - 5 continue
21:47:35.686 [pool-1-thread-4] INFO c.m.concurrency.example.aqs.test9 - 6 continue
21:47:35.686 [pool-1-thread-3] INFO c.m.concurrency.example.aqs.test9 - 7 continue
21:47:35.686 [pool-1-thread-2] INFO c.m.concurrency.example.aqs.test9 - 8 continue
Process finished with exit code 0
- 在案例一的基础上,无非新增了一个回调函数功能:在线程达到屏障的时候,优先执行该回调函数先。