使用工具类控制并发流程,更容易使线程相互配合,完成业务逻辑。
1 CountDownLatch–门栓
下面的场景是,发起一笔贷款申请,银行会查询客户多个资信,等到多个资信查到详情后再确定是否决策通过。
public class QueryClientCredits {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService pool = new ThreadPoolExecutor(
3, 3, 0, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
for (int i = 0; i < 3; i++) {
final int zxId = i + 1;
pool.submit(() ->{
try {
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
System.out.println("已经查到" + zxId + "资信");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
System.out.println("等待请求资信详情...");
latch.await();
System.out.println("已经查到所有资信,下一步进行决策。");
}
}
上面的场景是多等一的场景。针对于一等多,通常设置
CountDownLatch latch = new CountDownLatch(1);
在每个线程中调用await方法,等待countdown指令后,同时执行。
小结
CountDownLatch的典型使用是一等多和多等一两种,主要是在于CountDownLatch的countDown()和await()两个方法的运用,也可以扩展到多等多的情况。
CountDownLatch不能回滚重置,如果需要重新计数,可以考虑CyclicBarrier。
1.1 join方法与CountDownLatch区别
区别主要两点:
- 调用子线程的join方法后,该线程会一直阻塞到线程结束;而采用CountDownLatch采用计数器计数,不一定等到线程结束。
- 采用线程池管理线程的话,通常采用添加Runnable任务到线程池,无法调用join方法。
综上,CountDownLatch对线程有着更灵活、更优雅的控制。
1.2 内部原理
CountDownLatch内部类Sync继承AQS,通过AQS的state状态递减计数,递减操作采用CAS实现。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
2 Semaphore–信号量
信号量的作用是维护一个许可证的计数,线程可以获取许可证,许可证总数减1,线程使用完后归还许可证。许可证数量为0时,其他线程BLOCKED,直到有可用的许可证。
// 初始化并指定许可证数量
public Semaphore(int permits) // 默认非公平
public Semaphore(int permits, boolean fair)
// 线程获取许可证
public void acquire() // 可以一次获取多个许可证
// 使用完释放许可证
public void release() // 获取多少个,释放同等数量
许可证发放
public class SemaphoreDemo {
private static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
Task task = new Task();
for (int i = 0; i < 10; i++) {
pool.submit(task);
}
pool.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获取到许可证");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放许可证");
semaphore.release();
}
}
}
}
使用注意点:
- 获取与释放数量一致。
- 初始化公平性设置
- 获取和释放许可证对线程并无要求,可能A获取了,B释放。
- 例如实现“条件等待”。线程1在线程2完成准备工作后开始工作,线程1acquire,在线程2完成任务后release。
3 Condition–条件对象
Condition对象,可以用来代替Object.wait/notify,与Lock绑定。
- 与Object.wait相同,await可以自动释放Lock锁,调用wait时必须持有锁。
- signalAll唤醒正在等待的所有线程。signal是公平的,只会唤醒等待时间最长的那个线程。
3.1 使用Condition实现一个生产者消费者队列
通过Object.wait/notify和阻塞队列两种也可以实现生产者消费者队列,分别见Object和阻塞队列章节。
public class ProducerConsumerByCondition {
private int capacity = 10;
private Queue queue = new PriorityQueue(capacity);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
class Producer implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println("队列满,等待消费");
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("生产完一个元素,队列容量:" + queue.size());
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待生产元素");
notEmpty.await();
}
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
queue.poll();
notFull.signalAll();
System.out.println("消费一个元素,队列容量:" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerConsumerByCondition demo = new ProducerConsumerByCondition();
Thread producer = new Thread(demo.new Producer());
Thread consumer = new Thread(demo.new Consumer());
producer.start();
consumer.start();
}
}
场面的实现中,创建了lock锁的两个Condition对象。好处是,每个Condition有着单独的等待队列,调用await方法,放入对应的等待队列中,然后调用signal方法唤醒。我们可以看出,相对于使用一个Condition,唤醒的粒度变小了,更有针对性,避免唤醒不必要的线程。
4 CyclicBarrier–循环栅栏
用来阻塞一组线程,构造一个集合点。所有线程到达后,栅栏撤销,所有线程统一出发,执行剩下的任务。
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 初始化循环栅栏
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("所有线程到齐,开始出发。。。");
});
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.submit(new Task(i, barrier));
}
}
static class Task implements Runnable {
int id;
CyclicBarrier barrier;
public Task(int id, CyclicBarrier barrier) {
this.id = id;
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("线程" + id + "到达出发点。。。");
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
System.out.println("线程" + id + "已到达,等待出发。。。");
barrier.await();
System.out.println("线程" + id + "开始出发。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier与CountDownLatch的区别
- 作用不同:CyclicBarrier是等固定数量线程到达栅栏位置才能继续执行;而CountDownLatch是等待计数器数量为0。 即CyclicBarrier针对线程,CountDownLatch针对事件。
- 可重用性不同:CyclicBarrier可以重复使用;CountDownLatch不能循环重置。
- 特性不同:CyclicBarrier可以在所有线程到达后执行特定任务。
示例代码
https://gitee.com/dtyytop/advanced-java/tree/master/src/main/java/com/lzp/java/concurrent