并发工具类:
LK最近学习了并发工具类的知识,总结一下它们的用法:
1.CountDownLatch
2.CyclicBarrier
3.Semaphore
4.Exchanger
1.CountDownLatch
使用完CountDownLatch之后,觉得它适合应用于让其它线程去各自执行完,然后main线程开始执行的场合 。自己理解的原理图:
具体应用场景:
-
统计excle每个sheet数据(可以是银行流水),然后让主线程去合计每一个sheet和。
-
统计一个txt文件上每行数据的和,最后将每行和在相加。
/**
* 使用场景 主线程等待其它线程执行完毕在执行
*
* @author yrz
*
*/
public class MyCountDownLatch {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("第一个线程执行完毕!!!");
countDownLatch.countDown();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("第二个线程执行完毕!!!");
countDownLatch.countDown();
}
}).start();
try {
countDownLatch.await();
System.out.println("主线程开始运行。。。");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。
N为阻塞的线程数(本例中指 2)
2.CyclicBarrier
简介:
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活==。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
总结:所有线程到达屏障后,才会继续干活
自己理解的原理图:
应用场景:等待参会成员到会后,开始开会
/**
* 同步屏障CyclicBarrier
* @author yrz
*
*/
public class MyCyclicBarrier {
public static void meeting(CyclicBarrier barrier) {
try {
Thread.sleep(4000);
System.out.println(Thread.currentThread().getName() + "我到了,正在等待开会");
barrier.await();// 所有开会者到齐之前,一直等待
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人到齐,开始开会");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
MyCyclicBarrier.meeting(barrier);
}
}).start();
;
}
}
}
CyclicBarrier的await方法会阻塞到达屏障的线程,当最后一个线程到达后,执行barrierAction。
3.Semaphore
信号量
作用:控制并发线程,自己理解的原理图
/**
* 控制并发线程数Semaphore
* @author yrz
*
*/
public class MySemaphore {
private final static int MAX_COUNT=30;
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(10);
ExecutorService executorService=Executors.newFixedThreadPool(MAX_COUNT);
for (int i = 0; i < MAX_COUNT; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//信号量获取一个许可,在提供一个许可前一直将线程阻塞
System.out.println("保存数据");
System.out.println("可用的许可数量:"+semaphore.availablePermits());
semaphore.release();//释放一个许可,将其返回给信号量。
System.out.println("可用的许可数量:"+semaphore.availablePermits());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
信号量像一个水闸,Semaphor的构造函数默认限制值,这个值就像水闸的宽度一样,限制允许通过的水流量(也就是线程数量)。semaphore.acquire()方法获取许可,semaphore.release()方法释放许可。
4.Exchanger
Exchanger 两个线程进行数据交换,主要是使用exchanger.exchange()方法,比较两个线程的值是否相等。自己理解的原理图:
/**
* 两个线程进行数据交换的Exchanger
*
* @author yrz
*
*/
public class MyExchanger {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
String A = "123";
try {
exchanger.exchange(A);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
String B = "123";
try {
String A = exchanger.exchange(B);
if (A.equals(B)) {
System.out.println("A和B数据一致:" + ",A录入的是:" + A + ",B录入是:" + B);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
}