前言
我们经常会听到
JUC
,它其实是并发大师Doug Lea
在jdk写的java.util.concurrent.*
。其实我们在前面已经讲了Atomic
、Lock
,友链:并发艺术(二)一玩到底,玩透JAVA各种锁机制。这些算是开胃菜吧,在探讨并发核心AQS
前,这篇我们先来热身一下,玩玩JUC下的并发工具。
Doug Lea大师为我们提供了一些在不使用(无感知)synchronized、lock的特殊场景下的一些并发工具,希望看完这些,能为小伙伴们在以后的工作中有点点帮助。
CountDownLatch
CountDown,倒数,Latch,锁。如果觉得不好理解,我们来看下构造函数:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
复制代码
这里有个sync
,我们看下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
复制代码
可以看到,sync
继承AbstractQueuedSynchronizer
,好吧,这个有点东西,我们后面章节探讨。
CountDownLatch
构造参数count
,
count the number of times {@link #countDown} must be invoked before threads can pass through {@link #await}
Doug Lea大师给出了说明,线程通过await()前,必须调用countDown()的次数。
由此,我们可以大致理解下CountDownLatch的作用,来张图吧
我们初始化一个CountDownLatch
,count为4,左边的引用CountDownLatch
实例的四个countDown()
的地方,右边是一个await()
,这些可以在一个线程也可以在多个线程,因为我们操作的是CountDownLatch实例对象。
当4次countDown()
之后,await()
的地方就会放行,执行后续代码,未达到的话,就阻塞
。
CountDownLatch的一些主要方法:
await()
// 阻塞,直到countDown次数与初始化count一直
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
await(long timeout, TimeUnit unit)
// 阻塞一段时间,时间到继续执行后续代码
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
复制代码
countDown()
public void countDown() {
// release 1
sync.releaseShared(1);
}
复制代码
下面来一段demo
public class CountDownLatchDemo {
static CountDownLatch countDownLatch = new CountDownLatch(4);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 4; i++) {
new Thread(() -> {
countDownLatch.countDown();
System.out.println("线程ID:" + Thread.currentThread().getId() + ", countDown");
}).start();
}
countDownLatch.await();
System.out.println("pass...");
}
}
===============结果====================
线程ID:11, countDown
线程ID:12, countDown
线程ID:13, countDown
线程ID:14, countDown
pass...
复制代码
只有countDown次数==count
,才会执行await()后续代码
,否则阻塞
,下面我们来个超时的:
public class CountDownLatchDemo {
static CountDownLatch countDownLatch = new CountDownLatch(4);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 4; i++) {
new Thread(() -> {
try {
// 睡眠3s
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
System.out.println("线程ID:" + Thread.currentThread().getId() + ", countDown");
}).start();
}
// 等待2s
countDownLatch.await(2000, TimeUnit.MILLISECONDS);
System.out.println("pass...");
}
}
============结果================
pass...
线程ID:11, countDown
线程ID:12, countDown
线程ID:13, countDown
线程ID:14, countDown
复制代码
由于等待2s,而线程countDown前睡眠了3s,所有超时后,await()不再阻塞,继续执行。
CyclicBarrier
CyclicBarrier与CountDownLatch有点类似
我们看下构造函数
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
复制代码
看下Doug Lea的说明,parties,指障碍消除之前await()调用的次数。
同样四个线程,都可以访问CyclicBrrier的同一个实例对象
,当每个线程都执行到了await()
时,才会消除屏障
,放行,除了可以继续执行await()后续的代码
,还可以调用一个Runnable
,构造参数中我们可以定义。
执行await()
的时候,其实也维护了一个count
,跟CountDownLatch
原理差不多,这里就不看源码了,大家可以自己看下。
这里我们来模拟四个线程
及一个barrierAction
,其中两个线程
率先到达await()
public class CyclicBarrierDemo {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new BarrierAction());
// 屏障消除会调用此线程
static class BarrierAction implements Runnable {
@Override
public void run() {
System.out.println("barrierAction......");
}
}
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
System.out.println("Thread id :" + Thread.currentThread().getId() + "await start...");
cyclicBarrier.await();
System.out.println("Thread id :" + Thread.currentThread().getId() + "await end...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
System.out.println("Thread id :" + Thread.currentThread().getId() + "await start...");
// 睡眠1s
Thread.sleep(1000);
cyclicBarrier.await();
System.out.println("Thread id :" + Thread.currentThread().getId() + "await end...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
=================结果=======================
Thread id :11await start...
Thread id :12await start...
Thread id :13await start...
Thread id :14await start...
barrierAction......
Thread id :14await end...
Thread id :13await end...
Thread id :12await end...
Thread id :11await end...
复制代码
可以看到,虽然其中两个线程睡眠了1s
,但是结果还是等所有线程都执行完,才做后续执行,并且调用了我们定义的BarrierAction
。
CyclicBarrier
通过对await()
方法提供了超时机制,与CountDownLatch类似,可自行尝试。
Exchanger
Exchanger稍稍有些不同,它主要用于两个线程的数据交换,使用场景比较局限。
我们来操作一波
public class ExchangerDemo {
static Exchanger exchanger = new Exchanger();
static class ExchangeThread extends Thread {
private int n;
ExchangeThread(int n) {
this.n = n;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread() + " 原值 " + n);
this.n = (int) exchanger.exchange(n);
System.out.println(Thread.currentThread() + " 交换后值 " + n);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExchangeThread thread1 = new ExchangeThread(5);
ExchangeThread thread2 = new ExchangeThread(10);
thread1.start();
thread2.start();
}
}
================结果=====================
Thread[Thread-0,5,main] 原值 5
Thread[Thread-1,5,main] 原值 10
Thread[Thread-1,5,main] 交换后值 5
Thread[Thread-0,5,main] 交换后值 10
复制代码
可以看到Thread-0原值5,交换后变成10。Thread-1原值10,交换后变成5。
同样Exchanger也提供超时处理机制,大家可以自己尝试一下。
Semaphore
Semaphore
意为信号量
,所谓的量是一个数目
,而信号则代表个体的状态
,个人理解。有点类似连接池,假设总共10个连接,不可扩,用的时候申请一个,如果有,有获取连接,没有就等待,用完之后将连接池放回去。
构造函数
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
复制代码
我们看到,里面也用到了sync,这个后面介绍。permits为许可数量,fair代表是公平还是非公平,这个前面章节有介绍公平锁和非公平锁。
Semaphore主要有方法
// 获取许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
// 释放许可
public void release() {
sync.releaseShared(1);
}
复制代码
我们模拟一个简单的限流,有n个人,只有拿到令牌的人才能doSomething,其他没有令牌的得等着。
public static void main(String[] args) {
Token token = new Token();
// 初始化1个令牌
token.init(1);
// 初始化3个线程,假定代表三个人
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
token.acquire();
System.out.println("线程ID: " + Thread.currentThread().getId() + "获取令牌");
// doSomething
Thread.sleep(1000);
System.out.println("线程ID: " + Thread.currentThread().getId() + "放回令牌");
token.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
static class Token {
private Semaphore semaphore;
public void init(int n) {
semaphore = new Semaphore(n);
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
public void release() throws InterruptedException {
semaphore.release();
}
}
====================结果===========================
线程ID: 11获取令牌
线程ID: 11放回令牌
线程ID: 12获取令牌
线程ID: 12放回令牌
线程ID: 13获取令牌
线程ID: 13放回令牌
复制代码
我们可以看到同时只有一个人在doSomething。
Semaphore也提供一些超时机制,可以自行尝试。
方法 | 备注 |
---|---|
tryAcquire() | 尝试获取,成功返回true,失败返回false |
tryAcquire(long timeout, TimeUnit unit) | 尝试一定时间 |
总结
今天主要介绍了JUC下得一些并发工具,每个工具都有不同的使用场景:
工具 | 线程安全 | 场景 |
---|---|---|
CountDownLatch | AQS | 适用于n个线程操作是某个线程操作的前置 |
CyclicBarrier | Lock | 适用于一组线程达到某个屏障后,继续向后执行或调用其他线程 |
Exchanger | ThreadLocal、CAS | 适用于两个线程达到某个点后进行数据交换,场景比较局限 |
Semaphore | AQS | 适用于控制多个线程能同时访问一个资源的个数 |
可以根据平时的业务场景不同,来选择合适的并发工具。
下章将介绍并发核心AQS和Condition