Semaphore
又称“信号量”,控制多个线程争抢许可。
acquire: 获取一个许可,如果没有就等待。
release: 释放一个许可。
availablePermits: 方法得到可用的许可数目。
典型场景: 代码并发处理限流。hystrix
// 信号量机制
public class SemaphoreDemo {
public static void main(String[] args) {
SemaphoreDemo semaphoreTest = new SemaphoreDemo();
int N = 9; // 客人数量
Semaphore semaphore = new Semaphore(5); // 手牌数量,限制请求数量
for (int i = 0; i < N; i++) {
String vipNo = "vip-00" + i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取令牌
semaphoreTest.service(vipNo);
semaphore.release(); // 释放令牌
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
// 限流 控制5个线程 同时访问
public void service(String vipNo) throws InterruptedException {
System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
Thread.sleep(new Random().nextInt(3000));
System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
}
}
CountDownLatch
Java1.5被引入的一个工具类,常被称为:倒计数器。
创建对象时,传入指定数值作为线程参与的数量。
await: 方法等待计数器值变成0,在这之前,线程进入等待状态。
countdown: 计数器数值减一,直到为0。
经常用于等待其他线程执行到某一节点,再继续执行当前线程代码。
使用常景示例:
扫描二维码关注公众号,回复:
9162079 查看本文章
1、统计线程执行的情况;
2、压力测试中,使用countDownLatch实现最大程度的并发处理;
3、多个线程之间,互相通信,比如线程异步调用完接口,结果通知;
CyclicBarrier
也是1.5加入的,又称为“线程栅栏”。
创建对象时,指定栅栏线程数量。
await: 等指定数量的线程都处于等待状态时,继续执行后续代码。
barrierAction: 线程数量到了指定量之后,自动触发执行指定任务。
和CountDownLatch重要区别在于,CyclicBarrier对象可多次触发执行。
典型场景:
- 数据量比较大时,实现批量插入数据到数据库;
- 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总。
/ 循环屏障(栅栏),示例:数据库批量插入
// 游戏大厅... 5人组队打副本
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
// 任务1+2+3...1000 拆分为100个任务(1+..10, 11+20) -> 100线程去处理。
// 每当有4个线程处于await状态的时候,则会触发barrierAction执行
CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
// 这是每满足4次数据库操作,就触发一次批量执行
System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
for (int i = 0; i < 4; i++) {
System.out.println(sqls.poll());
}
}
});
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
sqls.add("data - " + Thread.currentThread()); // 缓存起来
Thread.sleep(1000L); // 模拟数据库操作耗时
barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
System.out.println(Thread.currentThread() + "插入完毕");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);
}
}