首先,从类注释可以得到的信息:
- 提供了一种框架,自定义了先进先出的同步队列,让获取不到锁的线程能进入同步队列中排队;
- 同步器有个状态字段,我们可以通过状态字段来判断能否得到锁,此时设计的关键在于依赖安全的 atomic value 来表示状态(虽然注释是这个意思,但实际上是通过把状态声明为 volatile,在锁里面修改状态值来保证线程安全的);
- 子类可以通过给状态 CAS 赋值来决定能否拿到锁,可以定义那些状态可以获得锁,哪些状态表示获取不到锁(比如定义状态值是 0 可以获得锁,状态值是 1 就获取不到锁);
- 子类可以新建非 public 的内部类,用内部类来继承 AQS,从而实现锁的功能;
- AQS 提供了排它模式和共享模式两种锁模式。排它模式下:只有一个线程可以获得锁,共享模式可以让多个线程获得锁,子类 ReadWriteLock 实现了两种模式;
- 内部类 ConditionObject 可以被用作 Condition,我们通过 new ConditionObject () 即可得到条件队列;
- AQS 实现了锁、排队、锁队列等框架,至于如何获得锁、释放锁的代码并没有实现,比如 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively 这些方法,AQS 中默认抛 UnsupportedOperationException 异常,都是需要子类去实现的;
- AQS 继承 AbstractOwnableSynchronizer 是为了方便跟踪获得锁的线程,可以帮助监控和诊断工具识别是哪些线程持有了锁;
- AQS 同步队列和条件队列,获取不到锁的节点在入队时是先进先出,但被唤醒时,可能并不会按照先进先出的顺序执行。
AQS 的注释还有很多很多,以上 9 点是比较重要的注释总结。
1.结构
AbstractQueuedSynchronizer 继承关系,核心成员变量,主要构造函数:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 同步器的状态,根据当前状态进行判断是否可以获得当前锁
// 如果当前state是0,那么可以获得锁
// 可重入锁,每次获得锁+1,每次释放锁-1
private volatile int state;
// 自旋超时阀值,单位纳秒
// 当设置等待时间时才会用到这个属性
static final long spinForTimeoutThreshold = 1000L;
// 同步队列的头
private transient volatile Node head;
// 同步队列的尾
private transient volatile Node tail;
// Node
// 即使同步队列的Node,也是条件队列的Node
static final class Node {
...}
// 条件队列,从基础属性上可以看出是链表队列
public class ConditionObject implements Condition, java.io.Serializable
//......
}
AbstractOwnableSynchronizer
AbstractOwnableSynchronizer 的作用就是为了知道当前是那个线程获得了锁,方便监控用
Node
既是同步队列的节点,也是条件队列的节点
static final class Node {
// 当前节点的线程
volatile Thread thread;
// 当前节点的前节点
// 节点 acquire 成功后就会变成head
// head 节点不能被 cancelled
volatile Node prev;
// 当前节点的下一个节点
volatile Node next;
/**
* 两个队列共享的属性
*/
// 表示当前节点的状态,通过节点的状态来控制节点的行为
// 普通同步节点就是 0 ,实际一共5种状态(1,0,-1,-2,-3)
volatile int waitStatus;
// 被取消,在同步队列中出错时会被(暂时)置为cancel,随后就会删除
static final int CANCELLED = 1;
// 表示其后有待唤醒的节点,存在的意义是防止有节点被忘记唤醒
static final int SIGNAL = -1;
// 表示当前 node 正在条件队列中,当有节点从同步队列转移到条件队列时,状态就会被更改成 CONDITION
static final int CONDITION = -2;
// 无条件传播,共享模式下,该状态的进程处于可运行状态
static final int PROPAGATE = -3;
// 在同步队列中,nextWaiter 只是表示当前 Node 是排它模式还是共享模式
// 在条件队列中,nextWaiter 就是表示下一个节点元素
Node nextWaiter;
// node是共享模式
static final Node SHARED = new Node();
// node是排它模式
static final Node EXCLUSIVE = null;
}
ConditionObject
// 条件队列,从属性上可以看出是链表结构
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列中第一个 node
private transient Node firstWaiter;
// 条件队列中最后一个 node
private transient Node lastWaiter;
}
Condition 接口如下:
public interface Condition {
// 如果线程被打断,抛 InterruptedException 异常
void await() throws InterruptedException;
// 虽然入参可以是任意的时间,但底层仍然转化成纳秒
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 返回的 long 值表示剩余的给定等待时间,如果返回的时间小于等于 0 ,说明等待时间过了
// 选择纳秒是为了避免计算剩余等待时间时的截断误差
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 和 wait 方法比较,不能被打断,其余一样
void awaitUninterruptibly();
// 返回值表示目前为止,指定日期是否到期,true 表示没有过期,false 表示过期了
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待的线程,在被唤醒前必须先获得锁
void signal();
// 唤醒条件队列中的所有线程
void signalAll();
}
AQS 实现 Condition 接口的作用:
- 当 lock 代替 synchronized 来加锁时,Condition 就可以用来代替 Object 中相应的监控方法了,比如 Object.wait、Object.notify、Object.notifyAll 这些方法;
- 提供了一种线程协作方式:一个线程被暂停执行,直到被其它线程唤醒;
- Condition 实例是绑定在锁上的,通过 Lock#newCondition 方法可以产生该实例;
- 除了特殊说明外,任意空值作为方法的入参,都会抛出空指针;
- Condition 提供了明确的语义和行为,这点和 Object 监控方法不同。
AQS的核心思想是:开源节流,两种模式加锁
- 开源:所有线程都能进入同步队列
- 节流:只有极少数线程能运行
- 两种模式
- 独占与共享
- 独占:只有1个线程能执行
- 共享:1个线程在执行前会叫醒其他线程一起执行
- 公平与非公平
- 公平:进入同步队列,服从同步器调度,先入先出
- 非公平:可以不进入同步队列,CAS修改state成功直接拿锁运行
- 独占与共享
2.同步队列
- 作用:管理多个线程的休眠与唤醒
- 策略:可以执行的线程 = RUNNABLE状态 && tryAcquire成功
- 独占模式(EXCLUSIVE):队首持锁,唤醒队二后tryAcquire尝试拿锁,队三及以后休眠
- 共享模式(SHARED):相较于独占模式只唤醒队二 ,共享模式还唤醒所有mode=shared节点(多了一步)
注:这里需要明确一点,独占和共享是对于加锁而言(能否多线程同时获锁),释放锁时没有独占和共享的概念
- 状态
- 初始化(0):入队的初始状态
- SIGINAL(-1):若当前node后面还有node,就要从0->SIGNAL
- CANCELLED(1):拿锁失败或出现异常,会在置为CANCELLED后删除,但并发时可能会暂时维持
2.1 独占-加锁
acquire
该方法用作获取锁。排他模式下,acquire 方法由子类的 lock 方法直接调用。如下图是 Reentrantlock 的静态内部类 Sync 和 NonfairSync:
注:从图中也可以看出 NonfairSync 的 lock 方法是非公平的,因为当前线程直接就有获取锁的机会(CAS修改state成功),不是必须要进入同步队列,接受同步器的调度。
- 尝试获得锁,成功直接放回
- 失败,加入同步队列,等待拿锁
public final void acquire(int arg) {
// tryAcquire 方法是需要子类去实现的
// CAS修改state判断能否拿到锁,拿到锁return true,不会再进入addWaiter
if (!tryAcquire(arg) &&
// addWaiter 入参代表是排他模式
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如下图,是 Reentrantlock中 NonfairSync 的 tryAcquire 方法的具体实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 获取当前线程
int c = getState(); // 获取当前同步器的状态
if (c == 0) {
// 若c=0,表示可以获取锁
if (compareAndSetState(0, acquires)) {
// 尝试将state通过CAS设置为1
setExclusiveOwnerThread(current); // 若CAS成功,表示当前线程可以拿锁,则将拿锁线程设为当前
return true; // 返回true
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果当前线程已经获得锁了
int nextc = c + acquires; // 重入,+1
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true; // 返回true
}
return false; // 否则,返回false,获得不到锁
}
addWaiter
将等待的线程加入同步队列。这里注意一点,对于同步队列节点的所有操作,都要是线程安全的,即通过CAS。
private Node addWaiter(Node mode) {
// 初始化 Node
Node node = new Node(Thread.currentThread(), mode);
// 在自旋前,先尝试看能否直接加到队尾,若成功直接返回
// 这种做法大部分都可以一次成功,节省了自旋的开销
Node pred = tail;
// 当前同步队列不能为空,因为为空时还要设置head
if (pred != null) {
node.prev = pred; // 将新节点node的前置节点设置为tail
// 为了保证线程安全,CAS交换尾结点,然后再连接上一个tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 当直接加入队列失败(即CAS失败或者队列为空),需要通过自旋保证node加入到队尾
enq(node);
return node;
}
enq
- 线程加入同步队列中方法,追加到队尾
- 这里需要重点注意的是,返回值是添加 node 的前一个节点
private Node enq(final Node node) {
// 自旋,保证在出现竞争时也能安全加入同步队列
for (;;) {
// 得到队尾节点
Node t = tail;
// 如果队尾为空,说明当前同步队列都没有初始化,进行初始化
// tail = head = new Node();
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
// 队尾不为空,将当前节点追加到队尾
} else {
node.prev = t;
// node 追加到队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
acquireQueued(核心方法)
管理同步队列(拿锁+休眠)
-
队二尝试拿锁,成功后取代队首
注:若拿锁失败(非公平锁),他仍是队二,队首也没删
-
阻塞队3-队n,前提是自旋使自己前一个节点的状态变成 signal(SIGNAL代表后面一定有待唤醒,不会被遗忘)
等线程一个个醒来后,再尝试拿锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// predecessor是Node中的方法,作用是放回当前节点的上一个节点(prev)
final Node p = node.predecessor();
// 同步队列队首是已经获得锁的节点
// 队二是有资格夺锁(tryAcquire)的节点,若成功则把自己设置为队首,失败就(再)进入休眠
// 队二有两种情况:1.新node进入直接就是队二 2.队n被唤醒后发现此时他已经前进了到了队二
if (p == head && tryAcquire(arg)) {
// 如果当前节点是队二,且可以获取到锁
// 将当前node置为head,实际上就是删除已经释放锁的节点
setHead(node);
// p(之前获得锁的节点)被回收,next置为null是为了help gc
p.next = null;
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 检验node能否休眠(pre=SIGNAL),若不能则设置pre=SIGNAL
// parkAndCheckInterrupt 阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 线程是在这个方法里面阻塞的,醒来的时候仍然在无限 for 循环里面,就能再次自旋尝试获得锁
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获得node的锁失败或异常,将 node 置为 CANCELLED,并删除
// 因此,在并发情况下,同步队列中的任何位置节点都可能短暂CANCELLED,但最后一定会被删掉
if (failed)
cancelAcquire(node);
}
}
setHead
排他模式下,获得锁的节点,一定会被设置成头节点
private void setHead(Node node) {
// 将head设置为当前ndoe
head = node;
// 将获得锁的线程置null
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire
校验能否安全休眠:当前线程可以安心阻塞的标准,就是前一个节点线程状态是 SIGNAL 了
- 前一个结点是SIGNAL,return true
- 前一个节点不是SIGNAL, ,return false(因为上层调用是自旋,所以最后一定能SIGNAL)
- waitStatus>0(已取消):依次向前寻找,挂到一个未取消节点后面
- waitStatus<=0 但不是SIGNAL,就将前一个结点设为SIGNAL
// 入参 pred 是前一个节点,node 是当前节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前一个节点 waitStatus 状态已经是 SIGNAL 了,直接返回,不需要在自旋了
if (ws == Node.SIGNAL)
return true;
// 如果当前节点状态已经被取消了。
if (ws > 0) {
// 找到前一个状态不是取消的节点,因为把当前 node 挂在有效节点身上
// 因为节点状态是取消的话是无效的,是不能作为 node 的前置节点的,所以必须找到 node 的有效节点才行
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 否则直接把节点状态置 为SIGNAL
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
通过park休眠当前线程,到时需要unpark唤醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这里最终调用的Unsafe类,是位于sun.misc包下的一个类,可以用来在任意内存地址位置处读写数据,支持一些CAS原子操作。Java最初被设计为一种安全的受控环境。尽管如此,HotSpot还是包含了一个后门sun.misc.Unsafe,提供了一些可以直接操控内存和线程的底层操作。
cancelAcquire
在node获得锁过程中失败或出现异常,就将node设为CANCELLED,然后删除;并发下可能短暂维持
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,
// 2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
2.2 独占-释放
release
unlock 的基础方法,用于释放锁。如图是Reentrantlock的unlock方法,可以看到是调用的继承了 AQS 的 Sync 的release方法,该方法是在AQS中实现的。
public final boolean release(int arg) {
// tryRelease 交给子类去实现
// 一般就是用当前同步器状态减去 arg,如果返回 true 说明成功释放锁。
if (tryRelease(arg)) {
// 如果可以释放锁,保存头结点head
Node h = head;
// 头节点不为空,并且非初始化状态
if (h != null && h.waitStatus != 0)
// 唤醒同步队列队首
unparkSuccessor(h);
return true;
}
return false;
}
下面是 ReentrantLock 的 Sync 的 tryRelease 方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果state=0了,就是可以释放锁了
free = true;
setExclusiveOwnerThread(null); // 将拿锁线程置为null
}
setState(c); // 重置同步器的state
return free; // 返回是否成功释放
}
unparkSuccessor(核心方法)
找到真正的队二(不是CANCELLED状态),并唤醒。
- node.next 非 null 非 CANCELLED,next就是队二
- node.next 不行,尾找,防止唤醒的node的前置节点仍然是CANCELLED
private void unparkSuccessor(Node node) {
// node 节点是当前释放锁的节点,也是同步队列的头节点
int ws = node.waitStatus;
// 如果节点已经被取消了,把节点的状态置为初始化
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿出队二s
Node s = node.next;
// s 为空,表示 node 的后一个节点为空
// s.waitStatus 大于0,代表 s 节点已经被取消了
// 遇到以上这两种情况,就从队尾开始,向前遍历,找到第一个 waitStatus 字段不是被取消的
if (s == null || s.waitStatus > 0) {
s = null;
// 结束条件是前置节点就是head了
for (Node t = tail; t != null && t != node; t = t.prev)
// t.waitStatus <= 0 说明 t 当前没有被取消,肯定还在等待被唤醒
if (t.waitStatus <= 0)
s = t;
}
// 唤醒以上代码找到的线程
if (s != null)
LockSupport.unpark(s.thread);
}
到这里需要明确一点,在AQS中,若一个线程释放了锁,接下来只会唤醒一个线程,并不是把所有线程都唤醒,然后大家再去竞争。
2.3 共享-加锁
共享锁和排他锁最大的不同在于:对于同一个共享资源,排他锁只能让一个线程获得
acquireShared
共享模式下,尝试获得锁
// 共享锁可以让多个线程获得,arg 可以被子类当做任意参数,比如当做可获得锁线程的最大个数
public final void acquireShared(int arg) {
// tryAcquireShared 首先尝试获得锁,返回值小于 0 表示没有获得锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
下面是 ReentrantReadWriteLock 的 tryAcquireShared 方法。注:Reentrantlock 采用的是独占锁,没有 tryAcquireShared方法。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
doAcquireShared
管理同步队列(拿锁+休眠),大部分逻辑和 acquireQueued 一致的
private void doAcquireShared(int arg) {
// node 追加到同步队列的队尾,acquireQueued 是排他模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到 node 前一个节点
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 大于 0 说明成功获得锁
if (r >= 0) {
// 此处和排它锁也不同,排它锁使用的是 setHead,这里的 setHeadAndPropagate 方法
// 不仅仅是把当前节点设置成 head,还会唤醒头节点的后续节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 这里都和排他锁是一致的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
- 把当前节点设置成头节点
- 看看后续节点有无正在等待,并且也是共享模式的,有的话唤醒这些节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 当前节点设置成头节点
setHead(node);
// propagate > 0 表示已经有节点获得共享锁了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 共享模式,还唤醒头节点的后置节点
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared(核心方法)
释放后置共享节点
private void doReleaseShared() {
// 自旋,保证所有线程正常的线程都能被唤醒
for (;;) {
Node h = head;
// 还没有到队尾,此时队列中至少有两个节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头结点状态是 SIGNAL ,说明后续节点都需要唤醒
if (ws == Node.SIGNAL) {
// CAS 保证只有一个节点可以运行唤醒的操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 进行唤醒操作
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 退出自旋条件 h==head,一般出现于以下两种情况
// 第一种情况,头节点没有发生移动,结束。
// 第二种情况,因为此方法可以被两处调用,一次是获得锁的地方,一处是释放锁的地方,
// 加上共享锁的特性就是可以多个线程获得锁,也可以释放锁,这就导致头节点可能会发生变化,
// 如果头节点发生了变化,就继续循环,一直循环到头节点不变化时,结束循环。
if (h == head) // loop if head changed
break;
}
}
2.4 共享-释放
releaseShared
共享模式下,释放当前线程的共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法
doReleaseShared();
return true;
}
return false;
}
3.条件队列
-
作用
- 实现类似synchronized的wait与signal,实现在使用锁时对线程管理。
- 而且由于实现了Condition,对线程的管理可以更加细化
-
命名:条件队列中将node叫做waiter
-
策略:
- 要加入阻塞队列的前提是,当前线程已经拿到了锁,并处于运行状态。
- 加入阻塞队列前要释放锁,即唤醒同步队列的队二结点拿锁运行
- 条件队列中的所有结点都是在阻塞状态
- **唤醒操作实际是将一个node从条件队列,移动到同步队列尾,让它去返回同步队列去休眠。**并不是随机就唤醒(unpark)一个线程。
-
两个状态
- CONDITION:条件队列中所有正常节点
- 初始化(0):要移到同步队列的节点
3.1 休眠
await(核心方法)
将线程的node放入条件队列
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入到条件队列的队尾
Node node = addConditionWaiter();
// 加入条件队列后,会释放锁,并唤醒队二去tryLock,然后删掉自己(head)
// 自己马上就要阻塞了,必须释放之前 lock 的资源,不然自己不被唤醒的话,别的线程永远得不到该共享资源了
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当一个node刚进入条件队列,它会被阻塞再这里
// 当某个线程被唤醒,即某个node被移动到同步队列,并且在同步队列中被唤醒了就会就会退出当前循环
while (!isOnSyncQueue(node)) {
// 阻塞在条件队列上
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 其他线程通过 signal 已经把 node 从条件队列中转移到同步队列中了
// 所以这里节点苏醒了,直接尝试 acquireQueued
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 如果状态不是CONDITION,就会自动删除
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter
将node加入到条件队列尾,返回新添加的 waiter
- 如果尾节点状态不是 CONDITION 状态,删除条件队列中所有状态不是 CONDITION 的节点
- 如果队列为空,新增节点作为队列头节点,否则追加到尾节点上
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾部的 waiter 不是 CONDITION 状态了,删除所有不是Condition的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建node,这个node与同步对列的队首不同,但都存着同一个thread
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 队列是空的,直接放到队列头
if (t == null)
firstWaiter = node;
// 队列不为空,直接到队列尾部
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
fullRelease
调用release,释放当前要加入条件对列的node的锁
- release后就会有线程在acquireQueued方法中醒来
- 醒来拿到锁后就会将head(保存当前wait线程的node,注:与条件队列的node不是同一个)
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 一般为1,重入为n
int savedState = getState();
// 调用release
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 若失败,就将node置为CANCELLED,并删除
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue
确认node不在同步队列上,再阻塞,如果 node 在同步队列上,是不能够上锁的。
- node 刚被加入到条件队列中,立马就被其他线程 signal 转移到同步队列中去了
- 线程之前在条件队列中沉睡,被唤醒后加入到同步队列中去
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
3.2 唤醒
signal
唤醒阻塞在条件队列中的一个节点
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 从头节点开始唤醒
Node first = firstWaiter;
if (first != null)
// doSignal 方法会把条件队列中的节点转移到同步队列中去
doSignal(first);
}
doSignal
寻找被唤醒节点,从队首开始尝试转移到同步队列
private void doSignal(Node first) {
do {
// firstWaiter(head)依次后移,若nextWaiter为空,说明到队尾了
// 将firstWaiter置为条件队列中的第二个节点
// 若第二个节点是null,即当前条件队列中只有一个节点,就将lastWaiter也置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将第一个节点(first)的下一个节点置为null,作用是从条件队列中删除first
first.nextWaiter = null;
// 通过while保证一定能转移成功,若失败,就后移一位
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal(核心方法)
将当前节点移动加到同步队列队尾
- 将node的state改为0(初始化),若失败return false
- 调用enq将node加到同步队列队尾
- 由于node(引用),所以相当于直接将此node从条件队列删除->接到同步队列队尾
- 返回的是node在同步队列的前一个结点pre
- 将pre设置为SIGNAL,标识后面有待唤醒的节点
- 如果设置失败,直接唤醒当前线程
final boolean transferForSignal(Node node) {
// 将 node 的状态从 CONDITION 修改成初始化,失败返回 false
// 等下次while循环,再尝试下一个结点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将node尾插到同步队列(从条件队列删除),返回的 p(pre) 是 node 在同步队列中的前一个节点
Node p = enq(node);
int ws = p.waitStatus;
// 状态修改成 SIGNAL,如果成功直接返回
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果 p 节点被取消,或者状态不能修改成SIGNAL,直接唤醒(不一定能拿到锁)
LockSupport.unpark(node.thread);
return true;
}
signalAll
唤醒所有结点
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到头节点
Node first = firstWaiter;
if (first != null)
// 从头节点开始唤醒条件队列中所有的节点
doSignalAll(first);
}
doSignalAll
把条件队列所有节点依次转移到同步队列去,即循环调用transferForSignal
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
// 拿出条件队列队列头节点的下一个节点
Node next = first.nextWaiter;
// 把头节点从条件队列中删除
first.nextWaiter = null;
// 头节点转移到同步队列中去
transferForSignal(first);
// 开始循环头节点的下一个节点
first = next;
} while (first != null);
}