版权声明:转载请注明出处: https://blog.csdn.net/qq_21687635/article/details/84671868
CountDownLatch、CyclicBarrier、Semaphore、Exchanger
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
一个生动例子
public class CountDownLatchExample {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("1");
countDownLatch.countDown();
System.out.println("2");
countDownLatch.countDown();
}
}).start();
countDownLatch.await();
System.out.println("3");
}
}
CountDownLatch的构造函数接收一个int类型的参数作为计数器,当调用countDown()方法时,计数器减1,await()方法会阻塞当前线程,直到计数器编程0。
CyclicBarrier
可循环使用的屏障,让一组线程到达一个屏障时阻塞,直到最后一个线程到达屏障时,屏障才会打开门,所有被屏障拦截的线程才会继续运行。
一个生动的例子
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("1");
}
}).start();
cyclicBarrier.await();
System.out.println("2");
}
}
因为主线程和子线程的调度都是由CPU决定的,两个线程都有可能先执行,所以会产生两种结果。
如果把new CyclicBarrier(2) 改成 new CyclicBarrier(3),则主线程和子线程会永远等待,因为没有第三个线程执行await()方法。
CyclicBarrier还提供了一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction。
一个生动的例子
public class CyclicBarrierExample2 {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println("3");
}
});
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("1");
}
}).start();
cyclicBarrier.await();
System.out.println("2");
}
}
会先输出3,1和2的输出顺序不确定。
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以循环使用。
Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
一个生动的例子
public class SemaphoreExample {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " : save data");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
});
}
threadPool.shutdown();
}
}
虽然有30个线程,但是只允许10个并发执行。
Exchanger
用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
通过exchange()方法交换数据,如果第一个线程执行exchange()方法,它会一直等待第二个线程执行exchange()方法。
一个生动例子
public class ExchangerExample {
private static Exchanger<String> exchanger = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
String A = "银行流水A";
try {
System.out.println(Thread.currentThread().getName() + " ---> A = " + A);
String B = exchanger.exchange(A);
System.out.println(Thread.currentThread().getName() + " ---> B = " + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
String B = "银行流水B";
try {
System.out.println(Thread.currentThread().getName() + " ---> B = " + B);
String A = exchanger.exchange(B);
System.out.println(Thread.currentThread().getName() + " ---> A = " + A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
}
参考
- Java并发编程的艺术[书籍]