AQS(AbstractQueuedSynchronizer)是JUC包下很多工具类的实现基础,它提供了一种实现阻塞锁和一系列依赖FIFO同步队列的框架。主要依赖一个int成员变量作为同步状态state,以及一个CLH同步队列,对于CLH同步队列,竞争资源同一时间只能被一个线程访问,CLH为管理等待锁的线程的队列。
AQS内部实现了独占锁和共享锁:
· 独占锁:每次仅有一个线程能够持有,具有排他性质。
· 共享锁:允许多个线程持有锁,并发访问共享资源。
源码分析:
AQS 节点定义:
AQS类内部定义了一个Node内部类,是AQS维护的CLH队列中的节点,每个Node其实是由线程封装,当线程竞争资源失败时,会封装成Node加入到AQS队列中。
static final class Node {
// 表示节点正在共享模式下等待的标记.
static final Node SHARED = new Node();
// 表示节点正在独占模式下等待的标记.
static final Node EXCLUSIVE = null;
// waitStatus值,指示线程已取消.
static final int CANCELLED = 1;
// waitStatus值,指示线程需要恢复暂停.
static final int SIGNAL = -1;
// waitStatus值,指示线程在条件等待.
static final int CONDITION = -2;
// waitStatus值,指示线程下一个acquireShared应无条件传播.
static final int PROPAGATE = -3;
// waitStatus.
volatile int waitStatus;
// 前一节点.
volatile Node prev;
// 后一节点.
volatile Node next;
// 当前线程.
volatile Thread thread;
// 下一等待者.
Node nextWaiter;
// 是否共享模式.
final boolean isShared() {
return nextWaiter == SHARED;
}
// 前一节点.
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 构造方法.
Node() {
}
// 构造方法.等待者调用.
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 构造方法.条件使用者调用.
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
通过Node定义可以看出,Node中封装的内容都是与线程资源竞争相关的,其中的状态标识了当前线程对于资源的使用状态。
AQS 实现基础:
// 同步队列的头结点.
private transient volatile Node head;
// 同步队列的尾结点.
private transient volatile Node tail;
// 同步状态.
private volatile int state;
上面三个成员变量主要用于协助构建同步队列,同时标记同步队列节点的当前状态。
AQS 内部方法:
AQS实现独占锁和共享锁主要依赖内部提供的一系列私有的成员方法,这些方法主要用于对队列和锁信息的维护:
compareAndSetState:
// 以CAS方式更新同步状态.
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
enq:
// 追加节点到尾节点后.
private Node enq(final Node node) {
for (;;) {
// 取得尾节点.
Node t = tail;
// 尾节点为空时.
if (t == null) {
// 初始化头尾节点.
if (compareAndSetHead(new Node()))
tail = head;
}
// 尾节点不为空时.
else {
// 将新节点链接在tail后.
node.prev = t;
// 以CAS方式,更新tail节点.
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter:
// 添加等待者.
private Node addWaiter(Node mode) {
// 创建等待者节点.
Node node = new Node(Thread.currentThread(), mode);
// 取得尾节点.
Node pred = tail;
// 尾节点不为空.
if (pred != null) {
// 将新节点链接到尾节点后.
node.prev = pred;
// 以CAS方式更新尾节点.
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 追加节点到尾节点后.
enq(node);
return node;
}
setHead:
// 设置头节点.
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
unparkSuccessor:
// 恢复后续节点.
private void unparkSuccessor(Node node) {
// 取得当前节点waitStatus.
int ws = node.waitStatus;
// ws < 0.
if (ws < 0)
// 以CAS方式更新waitStatus.
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一节点.
Node s = node.next;
// 若当前节点不存在下一节点或下一节点为已取消状态.
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点开始查找第一个符合条件的节点.
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 针对去取得的节点.
if (s != null)
// 将取得节点的线程恢复执行.
LockSupport.unpark(s.thread);
}
doReleaseShared:
// 释放共享锁.
private void doReleaseShared() {
for (;;) {
// 取得头节点.
Node h = head;
// 头节点不为空 && 头尾节点不同.
if (h != null && h != tail) {
// 获取头节点状态.
int ws = h.waitStatus;
// 节点状态为SIGNAL.
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// ws == 0.
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
setHeadAndPropagate:
// 设置头节点并设置waitStatus=PROPAGATE
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 释放共享锁.
if (s == null || s.isShared())
doReleaseShared();
}
}
cancelAcquire:
// 取消锁获取.
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 以CAS方式更新tail节点.
if (node == tail && compareAndSetTail(node, pred)) {
// 以CAS方式更新下一节点.
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 以CAS方式设置节点waitStatus.
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 以CAS方式设置下一节点.
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 恢复后续节点.
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
shouldParkAfterFailedAcquire:
// 获取锁失败,阻塞线程.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
selfInterrupt:
// 中断当前线程.
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
parkAndCheckInterrupt:
// 阻塞线程.返回中断状态.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued:
// 获取锁.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 取得前一节点.
final Node p = node.predecessor();
// p == head && 获取锁成功.
if (p == head && tryAcquire(arg)) {
// 设置头节点.
setHead(node);
p.next = null; // help GC
failed = false;
// 返回中断状态.
return interrupted;
}
// 获取锁失败阻塞线程.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 失败情况下,取消获取的锁.
cancelAcquire(node);
}
}
doAcquireInterruptibly:
// 带中断的获取独占锁.
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 加入阻塞队列.
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获取前一节点.
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 获取锁失败阻塞线程.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 失败情况下,取消获取的锁.
cancelAcquire(node);
}
}
doAcquireNanos:
// 指定等待时间获取独占锁.
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算最终时间.
final long deadline = System.nanoTime() + nanosTimeout;
// 加入等待队列.
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获取前一节点.
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 获取锁失败阻塞线程.
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 带阻塞时间的阻塞线程.
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
// 失败情况下,取消获取的锁.
cancelAcquire(node);
}
}
doAcquireShared:
// 阻塞获取共享锁.
private void doAcquireShared(int arg) {
// 创建包含共享状态的节点.
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 取得前一节点.
final Node p = node.predecessor();
if (p == head) {
// 尝试获取共享锁.
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头节点并设置waitStatus=PROPAGATE.
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 需要中断情况下中断线程.
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 获取锁失败阻塞线程.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 失败情况下,取消获取的锁.
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly:
// 带中断状态获取共享锁.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 共享锁节点加入等待队列.
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 取得前一节点.
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 获取锁失败阻塞线程.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 失败情况下,取消获取的锁.
cancelAcquire(node);
}
}
doAcquireSharedNanos:
// 带等待时间的获取共享锁.
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算最终时间.
final long deadline = System.nanoTime() + nanosTimeout;
// 共享锁节点加入等待队列.
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 取得前一节点.
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 获取锁失败阻塞线程.
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
// 失败情况下,取消获取的锁.
cancelAcquire(node);
}
}
AQS 锁相关扩展方法:
// 取得独占锁.
protected boolean tryAcquire(int arg);
// 释放独占锁.
protected boolean tryRelease(int arg);
// 取得共享锁.
protected int tryAcquireShared(int arg);
// 取得共享锁.
protected boolean tryReleaseShared(int arg);
// 是否持有独占锁.
protected boolean isHeldExclusively();
这些方法都是受保护方法,期望继承AQS类的子类去实现这些方法,完成锁的处理。在具体锁应用时,会具体分析。
AQS 锁相关公开方法:
// 取得独占锁.
public final void acquire(int arg);
// 带中断的取得独占锁.
public final void acquireInterruptibly(int arg) throws InterruptedException;
// 指定等待时间取得独占锁.
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
// 释放独占锁.
public final boolean release(int arg);
// 取得共享锁.
public final void acquireShared(int arg);
// 带中断的取得共享锁.
public final void acquireSharedInterruptibly(int arg) throws InterruptedException;
// 指定等待时间的取得共享锁.
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;
// 释放共享锁.
public final boolean releaseShared(int arg);
以上这些方法是AQS公开的针对锁的操作,包括独占锁和共享锁。
AQS 队列相关公开方法:
// 队列中是否有线程.
public final boolean hasQueuedThreads() {
return head != tail;
}
// 是否存在竞争.
public final boolean hasContended() {
return head != null;
}
// 获取队列中第一个线程.
public final Thread getFirstQueuedThread() {
return (head == tail) ? null : fullGetFirstQueuedThread();
}
// 指定线程是否在队列中.
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
// 队列中第一个是否独占锁.
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
// 队列中是否有前置任务.
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// 获取队列长度.
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
// 获取队列中线程集合.
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
// 获取取得独占锁的线程集合.
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
// 获取取得共享锁的线程集合.
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
以上这些方法是AQS针对队列的操作,可以获取队列中所有锁、独占锁和共享锁相关的队列信息,可以通过这些方法来监控队列信息。
AQS 条件相关公开方法:
AQS关于条件部分的实现,是根据java.util.concurrent.locks.Condition接口实现:
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;
public interface Condition {
// 线程阻塞.
void await() throws InterruptedException;
// 带中断的线程阻塞.
void awaitUninterruptibly();
// 指定等待时间的线程阻塞.
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 指定等待时间的线程阻塞.
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 指定等待时间的线程阻塞.
boolean awaitUntil(Date deadline) throws InterruptedException;
// 单个信号唤醒.
void signal();
// 所有信号唤醒.
void signalAll();
}
具体实现是由内部类ConditionObject来实现的:
public class ConditionObject implements Condition, java.io.Serializable {
// 序列化版本号.
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列中的第一个等待者.
private transient Node firstWaiter;
// 条件队列中的最后一个等待者.
private transient Node lastWaiter;
// 构造方法.
public ConditionObject() { }
// 增加条件等待者.
private Node addConditionWaiter() {
// 取得最后一个等待者.
Node t = lastWaiter;
// 若最后一个等待者状态是CANCELED,清除它.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建条件节点.
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 唤醒指定条件节点.
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 唤醒指定条件节点.
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 解除所有已取消的等待者.
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 唤醒第一个等待者.
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 唤醒所有等待者.
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 带中断的阻塞等待.
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
// 等待时检查中断.
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 线程中断.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 线程阻塞.
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 增加条件等待者.
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
// 是否由AbstractQueuedSynchronizer.
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 获取锁.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 解除所有已取消的等待者.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 线程中断.
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 指定等待时间的线程阻塞.
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
// 线程中断.
if (Thread.interrupted())
throw new InterruptedException();
// 增加条件等待者.
Node node = addConditionWaiter();
// 释放节点.
int savedState = fullyRelease(node);
// 计算最终时间.
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 是否由AbstractQueuedSynchronizer.
while (!isOnSyncQueue(node)) {
// 取消等待.
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 线程阻塞.
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 等待时检查中断.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 获取锁.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 解除所有已取消的等待者.
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 线程中断.
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 指定等待时间的线程阻塞.
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
// 线程中断.
if (Thread.interrupted())
throw new InterruptedException();
// 增加条件等待者.
Node node = addConditionWaiter();
// 释放节点.
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
// 是否由AbstractQueuedSynchronizer.
while (!isOnSyncQueue(node)) {
// 取消等待.
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
// 线程阻塞.
LockSupport.parkUntil(this, abstime);
// 等待时检查中断.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 获取锁.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 解除所有已取消的等待者.
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 线程中断.
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 指定等待时间阻塞线程.
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 时间戳转换为纳秒.
long nanosTimeout = unit.toNanos(time);
// 线程中断.
if (Thread.interrupted())
throw new InterruptedException();
// 增加条件等待者.
Node node = addConditionWaiter();
// 释放节点.
int savedState = fullyRelease(node);
// 计算最终时间.
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
// 是否由AbstractQueuedSynchronizer.
while (!isOnSyncQueue(node)) {
// 取消等待.
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
// 线程阻塞.
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 等待时检查中断.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 获取锁.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 解除所有已取消的等待者.
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 线程中断.
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 是否由AbstractQueuedSynchronizer实现.
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
// 队列中是否有等待者.
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
// 获取等待队列长度.
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
// 获取等待线程集合.
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
AQS提供的公开方法,为其实现类使用:
// Condition是否包含当前AQS.
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
// Condition是否包含等待者.
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
// 取得指定Condition的等待者队列长度.
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
// 取得指定Condition的等待线程集合.
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
总结:
以上几乎是AQS的全部API,整体来说AQS提供了独占锁、共享锁以及条件等待的相关实现,同时为其实现类提供了扩展方式,可由具体实现类进行扩展,完成更为应景和复杂的同步操作。
AQS内部主要提供了如下操作:
· 状态位state:
AQS用的是一个32位的整型来表示同步状态,使用volatile修饰,在独占锁中表示线程已经获得了锁(0表示未获取,1表示已获取,>1表示重入数)。
· CLH同步队列:
1.AQS的CLH在每个node里面使用一个状态字段来控制阻塞。
2.为了方便处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
3.head节点使用的是傀儡节点。
· 独占锁、共享锁
AQS维护的CLH队列锁中,每个node代表一个要获取锁的线程。该node中的包含两个常量SHARE、EXCLUSIVE。SHARE代表共享锁,EXCLUSIVE代表独占锁。
· 阻塞、唤醒
类似Object的wait()、notify()、notifyAll(),AQS也通过其内部类ConditionObject实现了await()、signal()、signalAll()来完成类似的操作。
注:文中源码均来自于JDK1.8版本,不同版本间可能存在差异。
如果有哪里有不明白或不清楚的内容,欢迎留言哦!