版权声明:欢迎转载,转载请标明来源 https://blog.csdn.net/weixin_41973131/article/details/88939362
一、前言
Condition
是一个接口类,是一个在JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类。
在JDK中一共有两个类实现了该接口,分别是AbstractQueuedSynchronizer
和AbstractQueuedLongSynchronizer
下面我们主要要讲的是AbstractQueuedSynchronizer
类中对Condition
的实现——ConditionObject
----其中的await()
方法
二、基础构成
Condition内部维护着一个装载线程结点的Condition队列,只有当当前线程获取了独占锁时才能够对Condition的方法进行调用。相对于AQS的Sync队列,Condition队列是一条不支持并发安全的单向 queue
// Condition队列的头结点
private transient Node firstWaiter;
// Condition队列的尾结点
private transient Node lastWaiter;
对于可中断的等待结点,有下面两种模式:
// 该模式意味着在退出等待时重新中断
private static final int REINTERRUPT = 1;
// 该模式意味着在退出等待时抛出InterruptedException
private static final int THROW_IE = -1;
锁有可以分为独占锁跟共享锁,两种模式各指向两种不同的Node:
// 标记表示节点正在共享模式中等待
static final Node SHARED = new Node();
// 标记表示节点正在独占模式中等待
static final Node EXCLUSIVE = null;
结点有三种等待状态:
// 标志该线程已被取消
static final int CANCELLED = 1;
// 标志该线程的后继结点需要被唤醒
static final int SIGNAL = -1;
// 标志该线程正在Condition等待队列
static final int CONDITION = -2;
// 标志下一个acquireShared应无条件传播
static final int PROPAGATE = -3;
三、await()
首先我们先看一下释放锁并将线程挂起的操作:await()
// 释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁!
public final void await() throws InterruptedException {
// 若线程已经被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入Condition锁队列
Node node = addConditionWaiter();
// 释放锁并返回状态位
int savedState = fullyRelease(node);
int interruptMode = 0;
// 自旋挂起,判断当前的线程是否在Syn队列中
// 直到被唤醒或者超时或者CACELLED等
// 因为上一步已经释放了锁,也就是说此时可能有线程已经获取锁同时可能已经调用了singal方法,如果已经唤醒,那么就不应该park了,而是退出while方法,从而继续争抢锁
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 获得锁,并将自己从Condition队列中释放(此处修改了node结点的状态位,具体的清除由其他线程完成),表明自己不再需要锁。
// 这里获得锁后的状态位不是THROW_IE就是REINTERRUPT,那么在后面就一定会被抛出异常或者中断线程
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// node结点不是最后一个结点,这个时候就要清除该node结点了,顺手把其他不是Condition状态位的结点也清了
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 抛出InterruptedException,重新中断当前线程,或不执行任何操作,具体取决于模式。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
操作流程:
- 判断线程是否中断,未中断则将当前线程加入Condition队列,并且释放锁将状态位保存起来
- 自旋挂起判断当前线程是否在AQS的Syn队列上,若不在则继续挂起,直到被CACELLED或者超时。
- AQS获得锁并且将自己从Condition队列中释放
- 清空当前结点后面被Cancel的结点,根据中断模式决定如何处理当前线程
3.1 addConditionWaiter()
先从我们第一个看到的addConditionWaiter()
开始解析:
// 在等待队列中添加新的等待对象
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果lastWaiter不为空,则检查该队列是否有被Cancel的结点
if (t != null && t.waitStatus != Node.CONDITION) {
// 遍历Condition队列的结点,移除已被取消的结点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 利用当前线程构建一个代表当前线程的结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 将该结点插入到Condition条件队列的尾部并返回该结点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
操作步骤:
- 判断Condition队列的最后一个结点是否为空,若不为空且等待状态不为CONDITION,说明当前结点在Condition队列已被Cancel,此时遍历队列并移除已被取消的结点
- 构建当前线程的结点并将其加入到队列尾部
对于unlinkCancelledWaiters()
:
// 遍历Condition的条件队列,将状态不为Condition(已被Cancel的节点)移除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 遍历过程中的最后一个Condition条件队列中的有效结点
// 从条件队列的第一个结点开始遍历
while (t != null) {
Node next = t.nextWaiter;
// 当前结点的等待状态位不是CONDITION,即该节点已被唤醒,移除该结点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
// 说明该结点已经是Condition队列的最后一个结点,可以设置lastWaiter的指向了
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}
操作步骤:
- 取出队列的第一个结点
- 若当前指向结点不为空,则判断当前结点的状态位
- 若当前结点的状态位不为Condition,则说明当前结点的线程已被唤醒,此时将Condition队列的尾部指向此节点的后继结点;若未被唤醒,则将
trail
指向当前结点的。注意:trail
结点以及其之前的结点即是更新后的Condition队列 - 若当前结点的后继结点不为null,则将该结点指向其后继结点。跳转到步骤2。
3.2 fullyRelease()
// 利用当前的状态位释放锁并返回保存的状态位。出错时清除结点
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 保存下来的状态位
int savedState = getState();
// 释放当前的锁,s释放失败则抛出异常
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
操作步骤:
- 保存状态位
- 释放成功则返回状态位,否则抛出异常并将当前线程的等待状态置为CANCELLED(取消)
这里不大明白为什么不这样子写:
final int fullyRelease(Node node) {
int savedState = getState();
if (release(savedState)) {
return savedState;
} else {
node.waitStatus = Node.CANCELLED;
}
}
如果有知道的大神希望能不吝赐教。
下面继续深入看一下release(Int)
操作:
// 释放持有的锁,用于实现unlock操作
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 成功释放锁,即已将锁的state状态位置为0
// 看AQS队列中的头结点是否为空并且能否被唤醒,如果可以的话就唤醒继任节点
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继结点
unparkSuccessor(h);
return true;
}
return false;
}
其他操作在ReentrantLock
释放锁的操作上讲过,因此就不再多说了。
3.3 isOnSyncQueue(Node)
// 如果一个节点(始终位于条件队列中的节点)现在正在等待重新获取同步队列,则返回true。
final boolean isOnSyncQueue(Node node) {
// 此处判断当前Node的状态,如果是CONDITION,表明是正常的没有被唤醒的节点
// 如果node的前继结点为null,表明是刚刚持有锁的线程
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果node结点既不是在等待队列中,也不是刚刚持有锁的线程,并且有后继结点,那么他肯定在Syn队列中
if (node.next != null) // 如果有继任者,则必须在队列中
return true;
// 遍历整个Node的Syn队列,如果存在返回true
return findNodeFromTail(node);
}
` // 如果节点通过从尾部向后搜索而处于同步队列中,则返回true。仅在isOnSyncQueue需要时调用。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (; ; ) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
这里可能理解得不大好。。
3.4 checkInterruptWhileWaiting(Node)
// 检查中断,如果在signalled前被中断返回THROW_IE,如果在signalled后被中断返回REINTERRUPT
// 若没有被中断则返回0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
若未中断则返回0,否则:
// 如有必要,在取消等待后将节点转移到同步队列。如果线程在被signal()之前被取消,则返回true。
final boolean transferAfterCancelledWait(Node node) {
// 如果成功设置结点的状态位,则说明中断发生时,没有signal的调用,因为signal方法会将状态设置为0;
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 将当前线程添加到aqs的等待的队列中
enq(node);
return true;
}
// 上面的代码唯一失败的可能是在此期间,condition被调用了signal方法,导致上面的compareAndSetWaitStatus返回false
// 如果不在Sync队列中,则让步给其他线程执行,直到当前的node已经被signal方法添加到Sync队列中;
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
四、最后
不知道该写什么结束语。。干脆什么都不写。。