目录
Condition
接口简介
任意一个 Java
对象都拥有一组监视器方法(定义在 java.lang.Object
上),主要包括 wait()、wait(long timeout)、notify()
以及 notifyAll()
方法,这些方法与 synchronized
同步关键字配合,可以实现线程之间的等待,唤醒
在 jdk 1.5
之后提供了 Lock
接口,Lock
又提供了条件 Condition
接口,使得对线程的等待、唤醒操作更加详细和灵活。Condition
接口与 Lock
配合也可以实现线程之间的等待,唤醒;但是这两者在使用方式以及功能特性上还是有差别的
对比项 | Object 监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 1.调用Lock.lock()获取 2.调用Lock.newCondition()获取Condition对象 |
调用方式 | 直接调用,如object.wait() |
直接调用,如condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态中不响应终端 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
- 一个
Lock
对象里可以创建多个condition
实例,线程可以注册在指定的condition
中从而选择性的进行线程通知,在调度线程上更加灵活 - 在使用
notify(),notifuAll()
方法进行唤醒时,被调度的线程是由cpu
随机选择的。但使用ReentrantLock
结合condition
是可以实现选择性通知
,这个功能是非常重要的,而且在condition
类中默认提供的 - 而
synchronized
就相当于整个Lock
对象中只有一个单一的condition
对象,所有的线程都注册在它一个对象上。线程开始notifyAll()
时,需要通知所有的WAITING
线程,没有选择权 Conditon
中的await()
对应Object
的wait()
Condition
中的signal()
对应Object
的notify()
Condition
中的signalAll()
对应Object
的notifyAll()
Condition
的应用
@Slf4j
public class ProducerConsumerTest {
private final Lock lock = new ReentrantLock();
// 通过Lock获取两个Condition实例
private final Condition addCondition = lock.newCondition();
private final Condition removeCondition = lock.newCondition();
private final LinkedList<Integer> resources = new LinkedList<>();
private final int maxSize;
public ProducerConsumerTest(int maxSize) {
this.maxSize = maxSize;
}
public class Producer implements Runnable {
private final int proSize;
private Producer(int proSize) {
this.proSize = proSize;
}
@Override
public void run() {
lock.lock();
try {
for (int i = 1; i < proSize; i++) {
log.info("已经生产产品数: " + i + "\t现仓储量总量:" + resources.size());
resources.add(i);
while (resources.size() >= maxSize) {
log.info("当前仓库已满,等待消费...");
try {
// 进入阻塞状态
addCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 唤醒消费者
removeCondition.signal();
}
} finally {
lock.unlock();
}
}
}
public class Consumer implements Runnable {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
while (true) {
lock.lock();
try {
while (resources.size() <= 0) {
log.info(threadName + " 当前仓库没有产品,请稍等...");
try {
// 进入阻塞状态
removeCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费数据
int size = resources.size();
for (int i = 0; i < size; i++) {
Integer remove = resources.remove();
log.info(threadName + " 当前消费产品编号为:" + remove);
}
// 唤醒生产者
addCondition.signal();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(10);
Producer producer = producerConsumerTest.new Producer(20);
Consumer consumer = producerConsumerTest.new Consumer();
final Thread producerThread = new Thread(producer, "producer");
final Thread consumerThread = new Thread(consumer, "consumer");
producerThread.start();
TimeUnit.SECONDS.sleep(2);
consumerThread.start();
}
}
运行结果
17:53:55.123 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 1 现仓储量总量:0
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 2 现仓储量总量:1
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 3 现仓储量总量:2
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 4 现仓储量总量:3
17:53:55.129 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 5 现仓储量总量:4
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 6 现仓储量总量:5
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 7 现仓储量总量:6
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 8 现仓储量总量:7
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 9 现仓储量总量:8
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 10 现仓储量总量:9
17:53:55.130 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 当前仓库已满,等待消费...
17:53:57.133 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:1
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:2
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:3
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:4
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:5
17:53:57.134 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:6
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:7
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:8
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:9
17:53:57.135 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:10
17:53:57.138 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 11 现仓储量总量:0
17:53:57.139 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 12 现仓储量总量:1
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 13 现仓储量总量:2
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 14 现仓储量总量:3
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 15 现仓储量总量:4
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 16 现仓储量总量:5
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 17 现仓储量总量:6
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 18 现仓储量总量:7
17:53:57.140 [producer] INFO org.exaplme.reflect.ProducerConsumerTest - 已经生产产品数: 19 现仓储量总量:8
17:53:57.140 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:11
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:12
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:13
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:14
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:15
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:16
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:17
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:18
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前消费产品编号为:19
17:53:57.141 [consumer] INFO org.exaplme.reflect.ProducerConsumerTest - consumer 当前仓库没有产品,请稍等...
Condition
接口源码
package java.util.concurrent.locks;
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
await()
:当前线程进入等待状态直到被通知或中断
。该方法返回时,该线程肯定又一次获取了锁awaitUninterruptibly()
:当前线程进入等待状态直到被通知
,等待过程中不响应中断
。该方法返回时,该线程肯定又一次获取了锁awaitNanos()
:当前线程进入等待状态直到被通知,中断,或者超时
。返回值(超时时间减
实际返回所用时间)如果是0
或者负数,那么可以认定已经超时了。该方法返回时,该线程肯定又一次获取了锁await(long time, TimeUnit unit)
:当前线程进入等待状态直到被通知,中断,或者超时
。如果在从此方法返回前检测到等待时间超时,则返回false
,否则返回true
。该方法返回时,该线程肯定又一次获取了锁awaitUntil(Date deadline)
:当前线程进入等待状态直到被通知,中断或者超过指定时间点
。如果没有到指定时间就被通知,则返回true
,否则返回false
signal()
:唤醒一个等待在Condition
上的线程,该线程从等待方法返回前必须获得与Condition
相关联的锁signalAll()
: 唤醒所有等待在Condition
上的线程,能够从等待方法返回的线程必须获得与Condition
相关联的锁
Condition
是一种广义上的条件队列。它为线程提供了一种更为灵活的等待/
唤醒模式,线程在调用 await()
后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition
必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个 Condition 的实例必须与一个 Lock 绑定
,因此 Condition
一般都是作为 Lock
的内部实现
Condition
实现原理
获取一个 Condition
必须要通过 Lock
的 newCondition()
方法。该方法定义在接口 Lock
中,返回的结果是绑定到此 Lock
实例的新 Condition
实例
package java.util.concurrent.locks;
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
Condition
是一个接口,其下仅有一个实现类 ConditionObject
,由于 Condition
的操作需要获取相关的锁,而 AQS
则是同步锁的实现基础,所以 ConditionObject
则定义为 AQS
的内部类
package java.util.concurrent.locks;
public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
.........
public class ConditionObject implements Condition, java.io.Serializable {
}
}
等待队列
每个 Condition
对象都包含着一个 FIFO
队列,该队列是 Condition
对象通知/
等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该 Condition
对象上等待的线程
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 头节点
private transient Node firstWaiter;
// 尾节点
private transient Node lastWaiter;
public ConditionObject() {
}
/**省略方法*/
}
从上面代码可以看出 Condition
拥有首节点 firstWaiter
,尾节点 lastWaiter
。当前线程调用 await()
方法后
- 将会以当前线程构造节点
Node
,并将节点加入到该等待队列的尾部 Condition
拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter
指向它,并且更新尾节点即可- 上述节点引用更新的过程并没有使用
CAS
保证,原因在于调用await()
方法的线程必定是获取了锁的线程,也就是说该过程是由锁
来保证线程安全的
事实上,节点 Node
的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node
在 Object
的监视器模型上,一个对象拥有一个同步队列和等待队列
,而并发包中的 Lock
(更确切地说是同步器)拥有一个同步队列和多个等待队列
(可以创建多个 condition
实例)
等待(调用 await()
)
调用 Condition
的 await()
(或者以 await
开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为 WAITING
状态。当从 await()
方法返回时,当前线程一定获取了 Condition
相关联的锁
如果从队列(同步队列和等待队列)的角度看 await()
方法,当调用 await()
方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition
的等待队列中
AQS
中的内部类 ConditionObject
所实现的 await()
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 检测节点是否在同步队列中,若节点没有在同步队列中则一直阻塞
while (!isOnSyncQueue(node)) {
// 阻塞挂起线程
LockSupport.park(this);
// 检查阻塞过程中是否发生中断,若有中断则退出while循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 调用signal()方法将当前线程从等待队列放到同步队列中
// 线程被唤醒,退出while循环
// acquireQueued()方法尝试获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理条件队列中的不是在等待条件的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
- 首先将当前线程新建一个节点同时加入到条件等待队列中
- 然后释放当前线程持有的同步状态
- 然后不断检测该节点代表的线程,是否出现在同步队列中(收到
signal()
信号之后就会在AQS
队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态
如果从队列的角度去看,当前线程加入 Condition
的等待队列,该过程如图所示
通知或唤醒
调用 Condition
的 signal()
方法,将会唤醒在等待队列中 等待时间最长的首节点
,在唤醒节点之前,会将节点移到同步队列中
AQS
中的内部类 ConditionObject
所实现的 signal()
public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
// 如果当前线程不是获取锁的线程,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
// 唤醒
doSignal(first);
}
}
- 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件
- 如果线程已经获取了锁,则将唤醒条件等待队列的首节点
- 唤醒首节点是先将条件等待队列中的头节点移出,然后调用
AQS
的enq(Node node)
方法将其安全地移到同步队列中 - 最后判断如果该节点的同步状态是否为
Cancel
,或者修改状态为Signal
失败时,则直接调用LockSupport
唤醒该节点的线程
被唤醒后的线程,将从 await()
中的 while
循环中退出(isOnSyncQueue(Node node)
方法返回 true
,节点已经在同步队列中),进而调用同步器的 acquireQueued()
加入到获取同步状态的竞争中
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的 await()
方法返回,此时该线程已经成功地获取了锁
Condition
的 signalAll()
方法,相当于对等待队列中的每个节点均执行一次 signal()
方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程
总结
- 一个线程获取锁后,通过调用
Condition
的await()
方法,会将当前线程先加入到条件等待队列中 - 然后释放锁,最后通过
isOnSyncQueue(Node node)
不断自检看节点是否已经在同步队列了,如果是则尝试获取锁,否则一直挂起 - 当线程调用
signal()
方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)
唤醒等待队列的首节点。被唤醒的线程,将从await()
中的while
循环中退出来,然后调用acquireQueued()
竞争同步状态
Condition
源码详解:https://blog.csdn.net/yakax/article/details/112982654