AQS
全称是AbstractQueuedSynchronizer
,在Java
中大部分用到的阻塞锁/同步器都是基于它进行实现的。
介绍
AQS
,即AbstractQueuedSynchronizer
,JDK
官方文档是这样描述AQS
的,AQS
是基于FIFO(first-in-first-out)
队列来实现阻塞锁和同步器的框架。例如,我们日常开发中使用到的ReentrantLock
就是基于AQS
来实现的,而从其源码也可以看到ReentrantLock
主要的实现逻辑也是放到AQS
中的。
这里笔者将官方文档对AQS
的描述整理了出来,其中主要分为两个部分:
第一部分是AQS
的设计与原理
AQS
是基于FIFO(first-in-first-out)
队列来实现阻塞锁和同步器的框架。AQS
依赖于一个原子变量来表示它的状态state
,并通过state
来定义acquire
获取同步器(锁)操作和release
释放同步器(锁)操作,其中对state
状态的更新操作只能通过getState
、setState
和compareAndSetState
等原子性方法。AQS
支持排他模式(EXCLUSIVE
)(默认)和共享模式(SHARED
)。AQS
支持Condition
条件队列,并定义了Condition
的实现类CondtionObject
。
所谓排他模式(
EXCLUSIVE
)表示的是以独占的方式获取/等待资源,而共享模式(SHARED
)则表示以共享的方式获取/等待资源
第二部分是AQS
的实现
在AQS
的实现类中,我们需要定义如下protected
级别的方法,分别是tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
、isHeldExclusively
(可通过检查/修改state
实现不同的特性),而其实现则必须保证是线程安全的、短暂的、非阻塞的(默认情况下,这些方法都会抛出UnsupportedOperationException
异常)。
对于上述的
protected
方法没有使用abstract
修饰而是简单的抛出UnsupportedOperationException
异常是为了让开发者尽可能少的开发代码,因为并不是所有阻塞锁/同步器都需要去实现的所有的protected
方法,一般会依据阻塞锁/同步器是排他模式(EXCLUSIVE
)还是共享模式(SHARED
)来进行选择。
另外,在AQS
中会通过继承AbstractOwnableSynchronizer
来实现资源持有线程的轨迹记录(可用于实现可重入机制)。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() {}
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer
的代码相对比较简短,阅读起来相对容易理解。从代码层面上看就是提供一个地方让我们可以把线程暂存储起来。另外,通过这种方式我们可以实现可重入机制(使用exclusiveOwnerThread
变量记录当前持有线程,并将其与后续获取线程进行比较),或者保证资源获取与释放线程的一致性等。
最后在这里,笔者基于上文描述和自身的理解编写了一个简易版的AQS
,其中也通过继承AbstractOwnableSynchronizer
来实现资源线程的一致性。
public class AQSImpl extends AbstractOwnableSynchronizer {
private final AtomicInteger state = new AtomicInteger(0);
public int getState() {
return this.state.get();
}
public void setState(int state) {
this.state.set(state);
}
public boolean compareAndSetState(int expect, int update) {
return state.compareAndSet(expect, update);
}
public void acquire(int args) {
while (!compareAndSetState(0, args)) {
// self-loop
}
super.setExclusiveOwnerThread(Thread.currentThread());
}
// 此处是线程安全的,因为只有加锁的线程才能执行解锁成功
public boolean release(int args) {
int state = getState() - args;
if (Thread.currentThread() != super.getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
if(state == 0){
super.setExclusiveOwnerThread(null);
}
setState(state);
return state == 0 ? true : false;
}
}
public static void main(String[] args) throws InterruptedException {
AQSImpl aqs = new AQSImpl();
aqs.acquire(1);
new Thread(() -> {
aqs.acquire(1);
System.out.println("线程1执行");
}).start();
Thread.sleep(1000);
System.out.println("主线程执行");
aqs.release(1);
Thread.sleep(5000);
}
执行结果:
主线程执行
线程1执行
简单来说,AQSImpl
是通过自旋+CAS
来实现的,即通过CAS
对state
进行同步修改(只有state==0
才能加锁成功),并在修改失败后通过自旋实现阻塞等待。而对于AQS
的资源同步同样也是基于CAS
方式来实现的,但其阻塞等待的原理就并不是直接通过自旋等待那么简单了。
下面我们就一起来看看AQS
是如何实现资源同步和阻塞等待的。
同步语义
在AQS
中,我们很容易就能找到具有相同作用的state
变量,这个state
变量同样是具有原子性的(通过volatile
和CAS
实现),在实现类中通过getState
、setState
和compareAndSetState
方法对它进行修改,下面我把这部分代码贴出来了:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
此处可以把资源抽象为变量
state
,即对变量state
的修改可等价为对资源的获取/释放。
但是与上述例子不一样的是,AQS
并不是通过硬编码来定义资源获取/释放的语义,而是提供一些方法让我们可以对它进行自定义,这也符合作为一个框架的设计理念。具体是通过什么渠道让我们可以自定义呢?上文也有提及到,即通过在实现类中覆盖方法tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
和isHeldExclusively
等方法来实现。下面笔者总结了这几个方法各自的作用:
方法 | 描述 |
---|---|
tryAcquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果返回true 表示获取成功,否则表示获取失败。其中,在方法的实现中我们应该判断当前是否能在独占模式获取资源。 |
tryRelease |
表示在排他模式(EXCLUSIVE )下去释放资源,如果返回true 表示全部释放成功,否则表示释放失败或者部分释放。 |
tryAcquireShared |
表示在共享模式(SHARED )下去去获取资源,如果返回大于0 表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0 表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0 则表示获取失败。其中,在方法的实现中我们应该判断当前是否能够在共享模式下获取资源。 |
tryReleaseShared |
表示在共享模式(SHARED )下去释放资源,如果返回true 表示释放成功,否则表示释放失败。 |
isHeldExclusively |
表示资源是否被独占地持有,如果返回true 表示被独占持有,否则表示没有被独占持有。 |
综上所述,当要实现独占语义时需要实现tryAcquire
、tryRelease
和isHeldExclusively
;当要实现共享语义时则需要实现tryAcquireShared
和tryReleaseShared
。而对这些方法的实现就是我们使用AQS
框架所需要做的所有工作了。
- 通过这样的抽象定义,我们就可以自由地定义每次只能由一个线程获取资源/锁(
ReentrantLock
)还是可以同时多个线程获取资源/锁(Semaphore
)。- 对于
Lock#tryLock
等类似的方法通过上述的自定义方法即可以完美地实现,具体可阅读相应源码。
分析至此,AQS
获取/释放资源(锁)的语义基本就阐述完成了。而我们回想开篇对AQS
的阐述:“AQS
是基于FIFO(first-in-first-out)
队列来实现阻塞锁/同步器的框架”;可得知AQS
是基于FIFO
队列的阻塞锁/同步器的框架,那么下面我们将对阻塞部分进一步分析。
数据结构
对于阻塞部分,在笔者上述的例子中是通过自旋操作来实现的,即忙等待或自旋等待。但是这样实现的阻塞是有性能问题的,当存在大量线程处于自旋阻塞状态时会消耗大量的CPU
时间,所以AQS
采用了另一种队列等待的方法,即当线程获取资源失败时,先把线程暂停然后将它加入到队列进行等待,直到唤醒条件符合时再次唤醒队列中等待的线程来获取资源。
这里首先来看看在AQS
中组成等待/条件队列的关键数据结构Node
:
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>Insertion into a CLH queue requires only a single atomic
* operation on "tail", so there is a simple atomic point of
* demarcation from unqueued to queued. Similarly, dequeuing
* involves only updating the "head". However, it takes a bit
* more work for nodes to determine who their successors are,
* in part to deal with possible cancellation due to timeouts
* and interrupts.
*
* <p>The "prev" links (not used in original CLH locks), are mainly
* needed to handle cancellation. If a node is cancelled, its
* successor is (normally) relinked to a non-cancelled
* predecessor. For explanation of similar mechanics in the case
* of spin locks, see the papers by Scott and Scherer at
* http://www.cs.rochester.edu/u/scott/synchronization/
*
* <p>We also use "next" links to implement blocking mechanics.
* The thread id for each node is kept in its own node, so a
* predecessor signals the next node to wake up by traversing
* next link to determine which thread it is. Determination of
* successor must avoid races with newly queued nodes to set
* the "next" fields of their predecessors. This is solved
* when necessary by checking backwards from the atomically
* updated "tail" when a node's successor appears to be null.
* (Or, said differently, the next-links are an optimization
* so that we don't usually need a backward scan.)
*
* <p>Cancellation introduces some conservatism to the basic
* algorithms. Since we must poll for cancellation of other
* nodes, we can miss noticing whether a cancelled node is
* ahead or behind us. This is dealt with by always unparking
* successors upon cancellation, allowing them to stabilize on
* a new predecessor, unless we can identify an uncancelled
* predecessor who will carry this responsibility.
*
* <p>CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
* effort if there is never contention. Instead, the node
* is constructed and head and tail pointers are set upon first
* contention.
*
* <p>Threads waiting on Conditions use the same nodes, but
* use an additional link. Conditions only need to link nodes
* in simple (non-concurrent) linked queues because they are
* only accessed when exclusively held. Upon await, a node is
* inserted into a condition queue. Upon signal, the node is
* transferred to the main queue. A special value of status
* field is used to mark which queue a node is on.
*
* ...
*
*/
static final class Node {
// ...
}
通过上述的注释说明应该可以清晰地认识到其设计思想,这里笔者对这段注释做了一个简单的翻译:
AQS
的等待队列(wait queue
)是CLH
锁(自旋锁)队列的一个变种。队列中每个节点Node
都会存储一个线程变量来标识当前执行线程和一个状态waitStatus
变量来标记当前状态是否被阻塞,并且通过这个状态来控制后继节点的行为。当节点处于队头时,这个节点将会去获取资源,但并不一定能获取成功(只是提供了一个获取的机会,如果获取失败将继续进入等待状态),这取决于我们实现的语义是否公平。
CLH
锁队列,可以参考论文《A Hierarchical CLH Queue Lock》
在队列中,节点出队操作只需简单的设置head
节点,而节点入队操作则需要对tail
节点进行原子性操作。对于节点与节点之间用两个链接所连接,分别是prev
和next
。其中prev
链接主要用于处理节点的取消操作,当节点被取消的时候,它的后继需要重新链接到前面一个非取消的节点;而next
链接则是用来实现阻塞机制的,前驱节点就是通过next
链接去唤醒后继节点(每个节点中都存储着所代表的阻塞线程)。另外,当唤醒后继节点和新节点入队发生竞争时,通过next
链接往后遍历可能会出现null
,这时候需要从tail
节点向前遍历寻找(因为入队时是先设置prev
的)。
关于
head
节点的创建,采用的是一种延迟加载的方式,在第一个阻塞节点入队时才会进行设置。即,在开始时并不会在构造方法预先创建,而是通过一个虚拟节点来代替。这样做是因为当不存在阻塞等待时预先创建会造成一种浪费。
另外,如果线程在condition
条件队列上进行等待也是使用相同数据结构Node
,区别在于这里并没有使用next
和prev
链接,而是用额外的nextWaiter
来链接。而对于condition
条件队列只需简单的执行链接操作即可,因为只有在独占持有资源时才会访问到相应的condition
条件队列,所以并不存在并发情况。其中在执行await
进入等待时,节点会被插入到condition
条件队列;在执行signal
时节点则被转移到等待队列。
对于节点
Node
存储在哪种队列(等待队列或条件队列)则是通过waitStatus
字段来标记的。
通过官方的注释,我们应该对等待/条件队列及其数据结构Node
具有一定的认识了,接下来我们再从源码的角度分析Node
:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
首先我们先来看waitStatus
状态字段在不同取值所表示的意义:
waitStatus |
描述 |
---|---|
SIGNAL=-1 |
表示当前节点的后继节点需要被唤醒。如果节点被设置为SIGNAL 时,那么其被释放或被取消时都会去唤醒后继节点。 |
CANCELLED=1 |
表示当前节点被取消了。在节点被中断或者超时都会设置为CANCELLED 。CANCELLED 是节点的终态(之一),所以节点进入CANCELLED 状态就不能再改变了。 |
CONIDTION=-2 |
表示当前节点处于condition 条件队列中等待。当节点从condition 条件队列转移到等待队列中时,waitStatus 状态将会被设置为0 。 |
PROPAGATE=-3 |
表示当前节点在下一次调用acquireShared 时应该被无条件的传播。releaseShared 也应该被传播到其他节点。 |
INIT=0 |
表示节点初始化状态,只有两个情况waitStatus 才会被设置为0 ,即队尾节点和出队的队头节点。 |
这里的
waitStatus
状态是通过数字表示的,这样做是为了简化对代码逻辑的一些处理,比如在唤醒节点时就可以直接通过范围直接进行判断,而不用列举相应的枚举进行判断了。
接着是wait
等待队列中用到的两个链接的prev
和next
:
prev
用于链接节点的前驱节点,在节点入队时会被设置,在节点出队时会被设置为null
(for GC
)。另外,当节点的前驱节点被取消时会重新链接到一个非取消的节点(这个节点总是存在的,因为head
节点是不能被取消的)。next
用于链接节点的后继节点,在节点入队时会被设置,在节点出队时会被设置为null
(for GC
)。另外,当节点入队时并不会立刻设置next
链接,而是在绑定成功(通过prev
链接)之后才设置next
链接,所以当发现next
指向null
时并不代表当前节点就是队尾,这种情况AQS
会做一个double-check
,当发现next
指向null
时它会从队尾tail
向前遍历寻找。
对于被取消节点的
next
链接是指向的是它自己而不是null
。
再接着是节点中存储的执行线程变量thread
:
对于thread
属性表示的是获取资源的线程,它会在入队构造节点时会设置thread
,而在出队时将它设置为null
。
此处在节点中保存
thread
属性是为了在后续操作中可以很方便地操作该执行线程。
然后是condition
条件队列节点间的链接属性nextWaiter
:
在condition
条件队列中,节点间是通过nextWaiter
属性进行链接的。因为condition
条件队列只有在独占持有资源时才能被访问,所以条件等待时只需要简单地持有节点链接到condition
条件队列即可。
另外,nextWaiter
还可以指向一个特殊值SHARED
,用于表示当前节点属于SHARED
模式,默认这个值是EXCLUSIVE
,即null
,表示EXCLUSIVE
模式。这里我们也可以看到方法isShared
的实现,只是简单通过比较nextWaiter
与SHARED
是否相等来判断节点是否在SHARED
模式下进行等待。
最后,这里结合注释和代码内容勾画出wait
等待队列和condition
条件队列的完整结构:
wait队列:
+-------------------+
| +<-----------------------------+
+--------->+ SHARED/EXCLUSIVE | |
| | +<---------+ |
| +-------------------+ | |
| ^ | |
Node | Node | Node | Node |
+------------+ | +------------+ | +------------+ | +------------+ |
| thread | | | thread | | | thread | | | thread | |
| waitStatus | | | waitStatus | | | waitStatus | | | waitStatus | |
| nextWaiter+----+ | nextWaiter+----+ | nextWaiter+----+ | nextWaiter+----+
| | | | | | | |
null<-----+prev +<-------+prev +<-------+prev +<------+prev |
| | | | | | | |
| next+------->+ next+------->+ next+------>+ next+----->null
+------------+ +------------+ +------------+ +------------+
^ ^
| |
head+--------------+ +---------------+tail
condition队列:
Node Node Node Node
+---------------+ +---------------+ +---------------+ +---------------+
| thread | | thread | | thread | | thread |
| waitStatus | | waitStatus | | waitStatus | | waitStatus |
| prev | | prev | | prev | | prev |
| next | | next | | next | | next |
| | | | | | | |
| | | | | | | |
| nextWaiter+--------->+ nextWaiter+---------->+ nextWaiter+--------->+ nextWaiter+--------->null
+-------+-------+ +---------------+ +---------------+ +--------+------+
^ ^
| |
firstWaiter+------+ +-------------+lastWaiter
等待队列
AQS
的等待队列是基于CLH
锁实现的,CLH
锁采用了是一种FIFO+忙自旋
的机制来减少资源(锁)的竞争,即在资源竞争失败后进入等待队列,在等待期间不断轮询是否能成功获取资源,如果成功获取则将当前等待节点设置为head
节点(即让原head
节点出队)。然而因为CLH
锁存在自旋消耗CPU
的问题,AQS
在CLH
锁的基础上添加了条件等待的机制(优化),在条件不符合时让队列中的节点(线程)进入线程等待状态,直至head
节点完成相应逻辑后唤醒其后继节点(避免了对CPU
无效的消耗)。
在对CLH
锁及其相关数据结构有一定的认识后,我们再来看看AQS
的核心处理流程(为了便于理解,此处修改/删减了部分代码):
/**
* 排他模式
*/
private void doAcquire(int arg) {
// 入队操作
final Node node = addWaiter(Node.EXCLUSIVE);
try {
// 入队后马上对此节点进行出队操作,并在之后不断进行轮询
for (;;) {
// 此处为队头出队,如果失败就让线程继续等待,否则成功获取资源执行结束(排他模式出队)
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head) {
boolean r = tryAcquire(arg);
// 获取资源成功
if(r){
// 将当前节点设置为head节点,并将原head节点出队
setHead(node);
p.next = null; // help GC
return;
}
}
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) && ...){
// 暂停节点线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkThread();
}
}
} finally {
// 如果失败则取消节点
if (failed)
cancelAcquire(node);
}
}
/**
* 共享模式
*/
private void doAcquireShared(int arg) {
// 入队操作
final Node node = addWaiter(Node.SHARED);
try {
// 入队后马上对此节点进行出队操作,并在之后不断进行轮询
for (;;) {
// 此处为队头出队,如果失败就让线程继续等待,否则成功获取资源执行结束(共享模式出队)
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head) {
int r = tryAcquireShared(arg);
// 获取资源成功
if (r >= 0) {
// 将当前节点设置为head节点,并将原head节点出队,最后向后传播唤醒信号。
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) && ...){
// 暂停节点线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkThread();
}
}
} finally {
// 如果失败则取消节点
if (failed)
cancelAcquire(node);
}
}
其中doAcquire
方法代表了排他模式下的出队和入队操作,而doAcquireShared
方法则代表了共享模式下出队和入队操作。虽然一个是作用于排他模式,另一个是作用于共享模式,但是在宏观上看两者本质上并没有明显的差异,基本上就是在节点在进入等待队列后不断的自旋判断是否能获取到资源,并在资源获取成功后将自己设置为head
节点(让原head
节点出队)。最后,笔者基于上述代码描绘出其流程的模型图:
+------+ spin / park +------+ spin / park +------+
head +---> | Node | <----------+ | Node | <----------+ | Node | <----+ tail
+---+--+ +---+--+ +------+
| ^
| |
+---------------------+
unpark
至此,我们应该对等待队列的机制和模型有了大体的认识,下面笔者将分别对等待队列的入队操作和出队操作展开进行分析。
节点入队
当执行线程在获取资源失败后,它会被构造成Node
节点并链接到等待队列末尾,并进入线程等待状态。而在AQS
中,无论是排他模式还是共享模式都是通过addWaiter
方法来将Node
节点链接到等待队列末尾。下面笔者将addWaiter
的相关代码贴了出来:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 此处用到死循环,保证在并发情况下也能插入成功
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
从上述代码可看到,对于不同模式的入队操作只是在Node
节点上的nextWaiter
属性有所差异,其他部分完全相同。另外,对于等待节点的插入是通过尾插法来实现的,具体流程如下所示:
如上文介绍所说,通过
nextWaiter
属性的不同取值来区分排他模式与共享模式。
- 通过
tail
链接寻找到尾节点- 如果尾节点为
null
,则初始化创建一个虚拟head
节点,最后重复执行第1
步 - 如果尾节点不为
null
,则执行第2
步
- 如果尾节点为
- 将新节点的
prev
链接指向尾节点tail
- 通过
CAS
把新的节点设置为尾节点,如果失败则回到第1
步继续执行,直到设置尾节点成功为止。 - 将原尾节点的
next
链接指向新插入的节点,返回新插入的尾节点。
通过解读你会发现:即使新节点的prev
链接指向了队尾节点也并不表示插入成功了,因为在并发情况下是有可能有多个线程同时执行了这一操作,而通过CAS
操作只能保证有一个节点设置成功并执行next
链接的设置,所以我们可以认为只有next
链接设置成功后才算是节点插入成功。
这里对上文提及到的“当
next
指向null
并不代表当前节点就是队尾”的观点也作出了回应和解释。因为设置prev
链接和next
链接并不是原子操作,所以有可能在通过next
链接判断后继节点时,另一个线程正在插入新节点且刚好执行到对prev
链接进行赋值的操作。而对于这种情况,AQS
是通过prev
链接倒序遍历来进行一个double-check
,这也相当于反向地利用了插入操作。
节点出队
在Node
节点进入到等待队列后,就会进行自旋判断是否能成功获取到资源,并在一定条件下进入线程等待状态。而因为在排他模式与共享模式下节点出队的执行流程有所差异,所以在这里将分点进行阐述。
在传统
CLH
锁中,因为仅仅是通过简单地自旋来判断当前是否能获取到资源并进行相应的出队操作,所以它无需其他唤醒操作。而AQS
在自旋的基础上还加入了条件等待的机制来让线程进入等待状态,所以在使用上还是需要调用相应的unparkSuccessor
方法来唤醒其后继节点,以让它继续执行自旋判断。
排他模式
所谓排他模式,即每次方法调用只能让一个节点获取到资源。
在排他模式中,在节点入队后会立即进行自旋判断是否能够获取资源,并在之后不断进行重试,直至获取成功为止(不考虑异常情况)。关于这部分代码并没有一个单独的方法进行包装,而是内嵌在acquire
获取资源的方法中,下面笔者将与排他模式相关的几个acquire
方法贴出来:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 主要看这里,对于入队的节点立刻执行自旋+条件等待的出队操作
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 此处通过死循环保证了入队的节点肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为head节点,所以这里必然只能是公平策略
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为head节点,并将原head节点出队
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) &&
// 暂停节点线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkAndCheckInterrupt())
// 此处中断是通过布尔型返回给调用方是否有中断
interrupted = true;
}
} finally {
// 如果发生错误(如抛出异常)则将对节点进行取消操作
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 将节点加入等待队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 此处通过死循环保证了入队的队列肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为head节点,并将原head节点出队
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) &&
// 暂停节点线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkAndCheckInterrupt())
// 此处中断是抛出中断异常
throw new InterruptedException();
}
} finally {
// 如果发生错误(如抛出异常)则将对节点进行取消操作
if (failed)
cancelAcquire(node);
}
}
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 如果最大等待时间小于等于0则表示不等待,直接返回false(此处只有阻塞等待的节点才能进来)
if (nanosTimeout <= 0L)
return false;
// 计算最大等待时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将节点加入等待队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 此处通过死循环保证了入队的节点肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为head节点,并将原head节点出队
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 判断是否超过最大等待时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) &&
// 判断是否大于最大自旋次数
nanosTimeout > spinForTimeoutThreshold)
// 暂停节点线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 如果发生错误(如抛出异常)则将对节点进行取消操作
if (failed)
cancelAcquire(node);
}
}
在
doAcquireNanos
中存在一个自旋优化,即nanosTimeout > spinForTimeoutThreshold
,只有在超时时间大于这个阈值时才执行LockSupport.parkNanos
方法暂停线程,否则直接执行自旋等待。官方给出的解释是:对于这个阈值之内的超时阻塞,自旋比停止调度要快。而在下文中与之类似的doAcquireSharedNanos
方法也采取了类似的优化手段。
上述代码给出了三种类型的acquire
方法,分别是acquireQueued
、doAcquireInterruptibly
和doAcquireNanos
,这三个方法虽然看上去存在一些差异,但实际上它们核心的出队逻辑都是相同的,即通过自旋+等待队列
。在这里笔者将其中节点出队的关键流程抽离了出来(伪代码):
如果硬要说出
acquireQueued
、doAcquireInterruptibly
和doAcquireNanos
的不同之处,可能就是在中断的处理、超时的处理上存在一些差别,但这些都不是理解核心流程的关键,此处可以先行忽略。
// 此处通过死循环保证了入队的节点肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为head节点,所以这里必然只能是公平策略
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为head节点,并将原head节点出队
setHead(node);
p.next = null; // help GC
// ...
// ... 资源获取成功后返回 return val
}
// 判断是否超时
// ...
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) && ...){
// 暂停节点线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkThread()
}
// ...
// ... 中断处理
}
其中将当前节点设置为
head
节点(即,将原head
节点出队)的setHead
方法逻辑如下所示:
/** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; // 当节点成为head节点时,thread变量存储的值就没用了(thread变量所存储的值用于执行线程等待和线程唤醒) node.thread = null; // head节点的前驱节点就是为null node.prev = null; }
即,具体的出队执行流程如下所示:
- 判断当前节点的前驱节点是否
head
节点,如果不是则获取资源失败,跳到第4
步 - 如果当前节点的前驱节点是
head
节点,则尝试获取资源(通过tryAcquire
方法定义),如果获取失败,跳到第4
步 - 如果获取资源成功,则进行出队操作(将当前节点设置为
head
节点,并让原head
节点出队)- 将当前节点设置为
head
节点(把当前节点的prev
链接和thread
变量设置为null
) - 将原
head
节点的next
链接设置为null
- 资源获取成功,返回结果值,执行结束
- 将当前节点设置为
- 判断当前节点是否进入等待状态,如果不是则跳回第
1
步 - 如果当前节点可以进入等待状态,则调用方法暂停线程,直到线程被唤醒跳回第
1
步(期间会判断中断状态)
- 在排他模式中获取资源的语义是通过
tryAcquire
来定义的。- 通过
p == head
来判断是否有资格获取资源,这必然是一种公平的策略,因为只有在前驱节点为head
节点的情况下线程才有资格去获取资源。
然而,因为AQS
会在一定条件下让队列节点进入线程等待状态,所以在节点执行完成后需要调用相应的unparkSuccessor
方法来唤醒其后继节点。其中,在排他模式下是通过release
的调用来触发unparkSuccessor
方法的。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
@ReservedStackAccess
public final boolean release(int arg) {
// 判断是否能释放资源
if (tryRelease(arg)) {
Node h = head;
// 判断head节点是否存在后继节点
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
在release
方法中首先会尝试去释放资源,如果成功才会去唤醒后继节点(如有)并返回true
,否则直接返回false
。其中,AQS
用于唤醒后继节点的unparkSuccessor
方法具体实现如下所示:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 尝试清理当前节点的状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 寻找后继节点
Node s = node.next;
// 判断后继节点是否被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 后继节点被取消,从队尾向前遍历寻找有效后继节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 判断是否存在后继节点
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
对于unparkSuccessor
方法其主要职责就是唤醒后继节点,其中主要有三个步骤:
- 尝试清理当前节点的状态,即设置为
0
(成功与否都忽略) - 寻找当前节点的后继节点(如果节点被取消,则从队尾反向查询)
- 通过方法
LockSupport#unpark
方法唤醒后继节点
最终,通过unparkSuccessor
方法唤醒后继节点延续节点入队后的自旋重试获取资源,直到其获取成功为止(仅考虑成功的情况)。
总的来说,在排他模式下每个节点在获取资源失败后会进入等待状态,并在其前驱节点执行完成后被唤醒以继续尝试获取资源。
共享模式
所谓共享模式,即每次方法调用可以让多个节点获取到资源。
与排他模式类似,在共享模式中节点入队后会立即进行自旋判断是否能够获取资源,并在之后不断进行重试,直至获取成功为止(不考虑异常情况)。而关于这部分代码逻辑也是内嵌在了acquire
获取资源的方法中,下面笔者将与共享模式相关的几个acquire
方法贴出来:
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 将节点加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 此处通过死循环保证了入队的队列肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为head节点,并将原head节点出队,最后向后传播唤醒信号
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) &&
// 暂停线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果发生错误(如抛出异常)则将对节点进行取消操作
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将节点加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为head节点,并将原head节点出队,最后向后传播唤醒信号
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) &&
// 暂停线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 如果发生错误(如抛出异常)则将对节点进行取消操作
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in shared timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果最大等待时间小于等于0则表示不等待,直接返回false(此处只有阻塞等待的节点才能进来)
if (nanosTimeout <= 0L)
return false;
// 计算最大等待时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将节点加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 此处通过死循环保证了入队的节点肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为head节点,并将原head节点出队,最后向后传播唤醒信号
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// 判断是否超过最大等待时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) &&
// 判断是否大于最大自旋次数
nanosTimeout > spinForTimeoutThreshold)
// 暂停线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 如果发生错误(如抛出异常)则将对节点进行取消操作
if (failed)
cancelAcquire(node);
}
}
上述代码同样给出了三种类型的acquire
方法,分别是doAcquireShared
、doAcquireSharedInterruptibly
和doAcquireSharedNanos
,它们的核心出队逻辑也是通过自旋+等待队列
的方式,在这里笔者也将其中节点出队的关键流程抽离出来(伪代码):
如果硬要说出
doAcquireShared
、doAcquireSharedInterruptibly
和doAcquireSharedNanos
的不同之处,可能就是在中断的处理、超时的处理上存在一些差别,但这些都不是理解核心流程的关键,此处可以先行忽略。
// 此处通过死循环保证了入队的队列肯定是能出队的,即重新去获取资源
for (;;) {
final Node p = node.predecessor();
// 此处判断了前驱节点为头节点,所以这里必然只能是公平策略
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将当前节点设置为head节点,并将原head节点出队,并向后传播唤醒信号(如有)
setHeadAndPropagate(node, r);
p.next = null; // help GC
// ...
// ... 中断处理
// ...
// ... 资源获取成功后返回 return val
}
}
// 判断是否超时
// ...
// 判断是否进入等待状态(获取资源失败时)
if (shouldParkAfterFailedAcquire(p, node) && ... ){
// 暂停线程(由于这里是在一个死循环体内,所以在唤醒之后会继续获取资源,直到成功为止。)
parkThread())
}
// ...
// ... 中断处理
}
其中将当前节点设置为
head
节点(即,将原head
节点出队)并向后传播唤醒信号(如有)的setHeadAndPropagate
方法逻辑如下所示:
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 将当前节点设置为head节点(即,将原head节点出队),详情可阅读上文 setHead(node); // 判断是否向后传播唤醒信号 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 判断后继节点是否共享模式 if (s == null || s.isShared()) // 向后传播唤醒信号 doReleaseShared(); } }
即,具体的出队执行流程如下所示:
- 判断当前节点的前驱节点是否
head
节点,如果不是则获取资源失败,跳到第4
步 - 如果当前节点的前驱节点是头节点,则尝试获取资源(通过
tryAcquireShared
方法定义),如果获取失败,跳到第4
步 - 如果获取资源成功,则进行出队操作(将当前节点设置为
head
节点,并让原head
节点出队,然后向后传播唤醒信号)- 将当前节点设置为
head
节点,并把节点的prev
链接和thread
变量设置为null
- 满足一定条件下唤醒后继节点继续尝试获取资源(向后传播)
- 将原
head
节点的next
链接设置为null
- 资源获取成功,返回结果值,执行结束
- 将当前节点设置为
- 判断当前节点是否进入等待状态,如果不是则跳回第
1
步 - 如果当前节点可以进入等待状态,则调用方法暂停线程,直到线程被唤醒跳回第
1
步(期间会判断中断状态)
- 在共享模式中获取资源的语义是通过
tryAcquireShared
来定义的。- 通过
p == head
来判断是否有资格获取资源,这必然是一种公平的策略,因为只有前驱节点是head
节点的情况下线程才有资格去获取资源。
然而,因为AQS
会在一定条件下让队列节点进入线程等待状态,所以在节点执行完成后需要调用相应的unparkSuccessor
方法唤醒后继节点。其中,在共享模式下是通过releaseShared
的调用来触发unparkSuccessor
方法的。
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
@ReservedStackAccess
public final boolean releaseShared(int arg) {
// 判断是否能释放资源
if (tryReleaseShared(arg)) {
// 唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
在releaseShared
方法中首先会尝试去释放资源,如果成功才会去唤醒后继节点(如有)并返回true
,否则直接返回false
。其中,共享模式下唤醒后继节点的操作与排他模式存在一些差异,它在unparkSuccessor
方法的基础上加入一些唤醒传播的逻辑,即doReleaseShared
:
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// 判断是否存在后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如存在(可唤醒的)后继节点
if (ws == Node.SIGNAL) {
// 清理head节点的状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 如不存在(需唤醒的)后继节点
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
关于
doReleaseShared
方法将waitStatus
状态设置为PROPAGATE
的原因可阅读下文《节点唤醒传播》小节,具体缘由在这里就不再继续展开了。
最终,doReleaseShared
方法会不断的判断是否存在(需唤醒的)后继节点(通过SIGNAL
状态判断),并在有的情况下通过unparkSuccessor
方法将其唤醒。而如果被唤醒的后继节点成功获取到了资源则会一直向后传播唤醒信号,直至被唤醒的后继节点获取资源失败为止(通过比较head
节点是否发生过变化)。
总的来说,在共享模式下每个节点在获取资源失败后会进入等待状态,在其前驱节点执行完成后被唤醒以继续尝试获取资源,并且在一定条件下不断的向后传播唤醒信号,直至被唤醒的后继节点获取资源失败为止。
本质上,无论是排他模式还是共享模式实现阻塞等待的机制都是基于CLH
锁,只不过AQS
在它的基础上加入了条件等待的策略。
扩展:实现细节
节点阻塞判断
根据上文的描述,在节点入队后就会不断的自旋判断是否能够获取到资源,并在一定条件下进入线程等待状态。而在AQS
中统一(不论是排他模式还是共享模式)都是通过方法shouldParkAfterFailedAcquire
来进行判断的(线程是否进入等待状态),这里笔者首先把相关代码贴出来:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire
方法主要用于在资源获取失败后对节点状态进行检查和更新,并最终确认节点是否能进入等待状态。如果返回true
表示节点可以进入等待状态,否则返回false
表示不能进入等待状态,对于这种情况调用方法会在外层代码中继续调用这个方法进行重试,直到返回true
为止。其中,对于shouldParkAfterFailedAcquire
方法主要分为3
种情况:
- 当前驱节点的
waitStatus
状态为SIGNAL
时,则表示节点可以进入等待状态,返回true
- 当前驱节点的
waitStatus
状态大于0
时(节点被取消),则需重新将当前节点链接到有效的前驱节点(通过prev
链接向前寻找),并返回false
表示当前不能进入等待状态 - 当前驱节点的
waitStatus
状态小于等于0
时(除SIGNAL
状态外),则需将节点更新为SIGNAL
状态,并返回false
表示当前不能进入等待状态
其中,对于第
3
种通过CAS
将前驱节点状态更新为SIGNAL
的情况,由于它并不能保证一定能更新成功,所以这里还是会返回false
表示当前节点不能进入等待状态。此时AQS
会在外层代码中继续调用此方法,直到其返回true
为止(如资源再次获取失败)。
即,只有在进入shouldParkAfterFailedAcquire
方法时当前节点的前驱节点状态为SIGNAL
时才能进入等待状态,其余情况则会在修复和更新操作后返回false
表示目前不能进入等待状态。
结合上文对
SIGNAL
状态所描述的语义和对unparkSuccessor
方法所限定的调用条件,不难理解为什么只有前驱节点状态为SIGNAL
时才能进入等待状态,简单来说就是只有当前head
节点为SIGNAL
时才会去唤醒其后继节点,所以此处必须保证只有其前驱节点状态被设置为SIGNAL
时才能进入等待状态。
节点唤醒传播
在共享模式下实现的向后传播唤醒信号主要是通过doReleaseShared
方法来实现,但是我们并不能直接对它进行调用,而是需要通过releaseShared
方法和setHeadAndPropagate
方法来间接触发doReleaseShared
的调用(上文有所提及)。基于更深入理解向后传播唤醒信号的实现方案,在这里笔者将从releaseShared
方法和setHeadAndPropagate
方法进行展开分析。
-
对于
releaseShared
方法,它主要用于在持有资源被释放成功后,对还存在等待资源的节点进行唤醒操作。/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ @ReservedStackAccess public final boolean releaseShared(int arg) { // 判断是否能释放资源 if (tryReleaseShared(arg)) { // 向后传播唤醒信号 doReleaseShared(); return true; } return false; }
-
对于
setHeadAndPropagate
方法,它主要用于在head
节点成功获取资源后,在可能还存在资源的情况下向后传播唤醒信号。/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 将当前节点设置为head节点(即,将原head节点出队),详情可阅读上文 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 判断是否向后传播唤醒信号 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 判断后继节点是否共享模式 if (s == null || s.isShared()) // 向后传播唤醒信号 doReleaseShared(); } }
基于共享模式的设计理念,只要存在有剩余资源的可能性就会执行向后传播唤醒信号,让更多的节点(线程)能够获取到资源得以执行。
对于releaseShared
方法,在释放资源成功后便会向后传播唤醒信号让后继节点获取刚刚被释放的资源,这其中的逻辑并不难理解。与之相对,setHeadAndPropagate
方法向后传播唤醒信号的判断条件则是比较难理解了,这里笔者把这部分代码单独拎了出来:
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)
从上述代码能得出只要满足其中一个条件就能执行doReleaseShared
方法进行唤醒信号地向后传播,这里笔者把这5
个条件拆分开并逐个进行分析:
propagate > 0
,在propagate > 0
的情况下允许向后传播h == null
,在原head
节点为null
的情况下允许向后传播h.waitStatus < 0
,在原head
节点的waitStatus < 0
的情况下允许向后传播(h = head) == null
,在新head
节点为null
的情况下允许向后传播h.waitStatus < 0
,在新head
节点的waitStatus < 0
的情况下允许向后传播
需要注意,从上至下每一个条件都是基于其上一个条件的反向逻辑。
对于第1
点在propagate > 0
的情况下允许向后传播这个条件应该比较容易理解,因为根据共享模式的定义当存在剩余资源propagate
时是可以向后传播的。而对于第2
、3
、4
、5
点理解起来并没有那么直观,因为它们的成立意味着propagate <= 0
,这并不符合共享模式的定义。此处,我们需要换一种思路来分析,即在并发条件下propagate
参数值可能并不准确、head
节点的变更可能处于临界状态等,如果仅仅判断propagate
则可能会在存在资源的情况下停止了唤醒信号的传播,进而导致等待节点永远不会被唤醒。
也就是说,当在doAcquireShared
方法中线程获取资源成功,并将propagate=0
参数传入到setHeadAndPropagate
方法时,如果当前节点还存在后继节点(通过waitStatus < 0
表示其存在后继节点)则会保守地进行一次传播操作,因为此时可能存在曾经的head
节点释放了持有的资源。
另外,在确定可以向后传播后会进一步判断其后继节点是否为共享模式,此时如果其后继节点为null
同样也会触发一次doReleaseShared
方法,这是因为当next==null
时有可能存在节点刚好执行入队操作(next==null
并不表示其就是队尾,具体缘由可以回顾上文)。即,此处依然采用了保守策略执行了一次向后传播。
综上所述,在并发条件下这些保守的边界判断能保证程序更好、更快的执行,就像注释上说的:“这些检查是保守的,可能会引起不必要的唤醒操作”。
在经过上述一系列的判断后我们就开始执行doReleaseShared
方法,即接下来我们就doReleaseShared
方法是如何执行向后传播唤醒信号展开分析。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果head节点状态为SIGNAL=-1(表示存在后继节点),则将其状态从SIGNAL(-1)设置为INIT(0)
if (ws == Node.SIGNAL) {
// 清理head节点状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果head节点状态为INIT=0(表示不存在可唤醒的后继节点),则将其状态从INIT(0)设置为PROPAGATE(-3)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果原head节点与新head节点相同,则表示唤醒的后继节点获取资源失败(或者不存在需唤醒的后继节点),退出循环
if (h == head) // loop if head changed
break;
}
}
在这里笔者把doReleaseShared
方法相应的执行步骤归纳了出来:
- 判断当前
head
节点是否存在后继节点,否则跳到第3
步 - 判断当前
head
节点的状态是否符合传播条件,否则跳到第3
步- 如果
head
节点状态为SIGNAL=-1
(表示存在可唤醒的后继节点),则将其状态从SIGNAL(-1)
设置为INIT(0)
,并唤醒其后继节点 - 如果
head
节点状态为INIT=0
(表示不存在可唤醒的后继节点),则将其状态从INIT(0)
设置为PROPAGATE(-3)
- 如果
- 判断
head
节点是否发生过变化(通过比较h == head
表达式,其中h
为开始执行时head
节点快照,head
则是最新的head
节点)- 如果发生过变化,跳回到第
1
步 - 如果没有发生过变化,则方法执行结束
- 如果发生过变化,跳回到第
后继节点(被唤醒的)在成功获取资源后,
head
节点就会发生改变(即h != head
),这种情况方法是会继续执行的。而如果不存在后继节点或者后继节点获取资源失败,则head
节点并没有发生改变(即h == head
),这种情况是跳出循环终止执行的。这里充分地体现了共享模式的本质,即当存在后继节点时会一直往后传播唤醒信号,直到唤醒的后继节点获取资源失败为止(一次调用可唤醒多个等待节点)。
在此处就引出了一个问题,为什么在共享模式下需要将状态0
设置为PROPAGATE(-3)
呢?或者换个说法,为什么需要存在PROPAGATE(-3)
这个状态呢?
我们不妨先来看看PROPAGATE(-3)
的状态说明:“A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
”。简单来说就是releaseShared
的调用应该向后传播唤醒信号,而将状态设置为PROPAGATE(-3)
可以保证传播顺利的进行。换句话说,如果没有这个PROPAGATE(-3)
状态就会导致传播中断进而发生异常,但是具体导致异常的原因在这里却并没有十分清晰的描述。
基于此笔者在网上查阅了相关的资料并最终找到了答案,即在JDK6
的某个版本(修复前版本)中存在这么一个问题:“并发执行releaseShared
方法会导致部分等待节点(线程)没有被唤醒”,为此Doug Lea
大佬提交了一个commit
修复了这个问题,即《6801020: Concurrent Semaphore release may cause some require thread not signaled
》,其中主要的解决方案在commit
中是这样描述的:“Introduce PROPAGATE waitStatus
”,即通过引入PROPAGATE
状态。
其中
Semaphore
是基于AQS
共享模式实现的一个信号量工具类。
既然已经知道了引入PROPAGATE
状态的原因,那么这里我们就顺着Doug Lea
大佬提供的思路将所产生的BUG
复现出来,然后再通过引入PROPAGATE
状态来将BUG
解决了。
首先这里将触发BUG
的单元测试贴了出来(为了便于理解,笔者将其中非核心的部分剔除了):
// 单元测试
Semaphore sem = new Semaphore(0, fair);
Runnable blocker = ()->sem.acquire();
Runnable signaller = ()->sem.release();
Thread b1 = new Thread(blocker);
Thread b2 = new Thread(blocker);
Thread s1 = new Thread(signaller);
Thread s2 = new Thread(signaller);
Thread[] threads = { b1, b2, s1, s2 };
for (Thread thread : threads)
thread.start();
for (Thread thread : threads) {
thread.join(60 * 1000);
}
if (sem.availablePermits() != 0)
throw new Error(String.valueOf(sem.availablePermits()));
if (sem.hasQueuedThreads())
throw new Error(String.valueOf(sem.hasQueuedThreads()));
if (sem.getQueueLength() != 0)
throw new Error(String.valueOf(sem.getQueueLength()));
接着,我们再来看看修复前引发BUG
的代码,分别是setHeadAndPropagate
方法和releaseShared
方法:
// 修复前
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在旧版本的setHeadAndPropagate
方法和releaseShared
方法中并不存在对doReleaseShared
方法的调用,而是在判断propagate
和waitStatus
(不存在PROPAGATE
状态)后直接调用unparkSuccessor
方法执行唤醒操作。
至此,在基于上述旧版本的代码中笔者作出如下假设:
-
首先线程
b1
和b2
分别执行了Semaphore#acquire
方法(即,执行AQS#doAcquireShared
方法),而因为初始化信号量为0
(即不存在资源),所以两次获取资源都失败了,并分别进入到等待队列中。如下图所示:step1:线程b1和b2执行了AQS#doAcquireShared方法获取资源失败,构造为Node节点进入等待队列 state = 0 head(虚拟节点) +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
-
然后线程
s1
执行了Semaphore#release
方法(即,执行AQS#releaseShared
方法),此时线程s1
调用AQS#tryReleaseShared
方法释放了1
个资源(资源+1
),并因为head
(Node1
)节点的waitStatus==SIGNAL
(不等于0
),所以接着唤醒后继节点Node2(b1)
并将head
(Node1
)节点的waitStatus
设置为0
。如下图所示:step2:线程s1执行了AQS#tryReleaseShared方法释放1个资源,并唤醒后继节点Node2(b1) state = 1 head(虚拟节点) executing +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
-
然后线程
b1
执行AQS#tryAcquireShared
方法持有1
个资源(资源-1
),并进入到AQS#setHeadAndPropagate
方法(开始处)。如下图所示:step3:线程b1执行了AQS#tryAcquireShared方法持有1个资源 state = 0 head(虚拟节点) executing +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
// 修复前 private void setHeadAndPropagate(Node node, int propagate) { // o <- 此处为临界点 // 此时head=Node1,node=Node2(b1),propagate=0 setHead(node); if (propagate > 0 && node.waitStatus != 0) { Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } }
-
接着线程
s2
执行了Semaphore#release
方法(即,执行AQS#releaseShared
方法),此时线程s2
调用AQS#tryReleaseShared
方法释放了1
个资源(资源+1
),但是因为head
节点仍然为Node1
,并不符合唤醒后继节点的条件(waitStatus==0
),所以直接执行结束。如下图所示:step4:线程s2执行了AQS#tryReleaseShared方法释放1个资源,但不唤醒后继节点 state = 1 head(虚拟节点) executing +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
-
最后线程
b1
被设置为head
节点,但是由于传入的propagate
(快照)为0
并不符合唤醒后继节点的条件,所以直接执行结束,如下图所示:step5:线程b1被设置为head节点,但不唤醒后继节点 state = 1 head +---------+ +----------+ +----------+ | +- ->+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +< --+ +<-------+ | +---------+ +----------+ +----------+
最终结果,在还剩余1
个信号量的情况下停止了唤醒信号的传播,进而导致节点Node3(b2)
永远停留在队列中(不考虑中断)。
为此,Doug Lea
大佬引入了PROPAGATE(-3)
状态以解决该问题,并在setHeadAndPropagate
方法和releaseShared
方法中作出相应的修改,最终得以解决。具体如下:
// 修改后
private void setHeadAndPropagate(Node node, long propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
下面我们再按照上述假设重新执行一遍流程:
-
首先线程
b1
和b2
分别执行了Semaphore#acquire
方法(即,执行AQS#doAcquireShared
方法),而因为初始化信号量为0
(即不存在资源),所以两次获取资源都失败了,并分别进入到等待队列中。如下图所示:step1:线程b1和b2执行了AQS#doAcquireShared方法获取资源失败,构造为Node节点进入等待队列 state = 0 head(虚拟节点) +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
-
然后线程
s1
执行了Semaphore#release
方法(即,执行AQS#releaseShared
方法),此时线程s1
调用AQS#tryReleaseShared
释放了1
个资源(资源+1
),因此可以进一步执行doReleaseShared
方法。在doReleaseShared
方法中,由于head
(Node1
)节点的waitStatus==SIGNAL
,所以将head
(Node1
)节点的waitStatus
设置为0
并且唤醒后继节点Node2(b1)
。如下图所示:step2:线程s1调用AQS#tryReleaseShared方法释放1个资源,并调用doReleaseShared方法唤醒后继节点Node2(b1) state = 1 head(虚拟节点) executing +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
-
然后线程
b1
执行AQS#tryAcquireShared
方法持有1
个资源(资源-1
),并进入到AQS#setHeadAndPropagate
方法(开始处)。如下图所示:step3:线程b1执行AQS#tryAcquireShared方法持有1个资源 state = 0 head(虚拟节点) executing +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
// 修改后 private void setHeadAndPropagate(Node node, long propagate) { Node h = head; // o <- 此处为临界点 // 此时head=Node1,node=Node2(b1),propagate=0 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
-
接着线程
s2
执行了Semaphore#release
方法(即,执行AQS#releaseShared
方法),此时线程s2
调用AQS#tryReleaseShared
方法释放了1
个资源(资源+1
),因此可以进一步执行doReleaseShared
方法。在doReleaseShared
方法中,由于head
(Node1
)节点仍然为Node1
,所以此处条件判断后执行了分支2
(waitStatus==0
)将head
(Node1
)节点的状态从0
设置为PROPAGATE(-3)
,最后结束方法的执行(由于head
节点并没有发生变化)。如下图所示:step4:线程s2调用AQS#tryReleaseShared方法释放了1个资源,将当前head节点状态设置为PROPAGATE(-3) state = 1 head(虚拟节点) executing +---------+ +----------+ +----------+ | +------>+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +<------+ +<-------+ | +---------+ +----------+ +----------+
-
接着线程
b1
被设置为head
节点,而由于原head
节点(即,变量h
)状态被设置为PROPAGATE(-3)
符合条件waitStatus < 0
,因此可以进一步执行进入到doReleaseShared
方法。在doReleaseShared
方法中,由于head
(Node2
)节点的waitStatus==SIGNAL
,所以将head
(Node2
)节点的waitStatus
设置为0
并且唤醒后继节点Node3(b2)
。如下图所示:step5:线程b1被设置为head节点,并唤醒后继节点 state = 1 head executing +---------+ +----------+ +----------+ | +- ->+ +------->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +< --+ +<-------+ | +---------+ +----------+ +----------+
-
最后线程
b2
执行AQS#tryAcquireShared
方法持有1
个资源(资源-1
),并进入到AQS#setHeadAndPropagate
方法,将节点Node3(b2)
设置为head
节点。最终因为它不存在后继节点(即,propagate==0 && waitStatus==0
),所以直接执行结束。如下图所示:step6:线程b2被设置为head节点,并结束执行 state = 0 head +---------+ +----------+ +----------+ | +- ->+ +-- ->+ | | Node1 | | Node2(b1)| | Node3(b2)| | +< --+ +<- --+ | +---------+ +----------+ +----------+
至此,AQS
在引入PROPAGATE(-3)
状态后避免了传播中断问题的再次发生。
思考:在分析
PROPAGATE
作用的时候发现它只有在doReleaseShared
方法中才会被设置,但对于它的比较却没有使用过一条特殊的等式,基本是通过waitStatus < 0
来判断的,可以更直接地说它是为setHeadAndPropagate
方法的条件判断而生的。这不禁让我陷入了沉思,状态PROPAGATE
是否必要的呢?或者说在不考虑语义的前提下状态PROPAGATE
是否能用SIGNAL
来代替呢?
阻塞原理
在对AQS
的应用中,对于等待队列的一些底层方法(如doAcquire
、doAcquireShared
等)一般是不会直接进行调用的,而是通过其上层的包装方法间接地对其进行调用。接下来我们就从上层的维度来分析其线程阻塞的原理。
排他阻塞
在AQS
中,如果要在排他模式下执行阻塞式获取资源则可以通过调用acquire
或acquireInterruptibly
方法来达到目的,即在资源充足的情况下调用acquire
或acquireInterruptibly
方法获得资源并执行相应的逻辑;而在资源不足的情况下调用acquire
或acquireInterruptibly
方法则会进入阻塞状态。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
关于acquire
或acquireInterruptibly
方法的核心流程是相同的,即首先会通过tryAcquire
方法尝试获取资源(具体的获取语义则需要使用者自定义),如果资源被获取成功则直接返回;而如果资源获取失败,则通过acquireQueued
和doAcquireInterruptibly
等方法将线程构造为Node
节点插入到等待队列中进行阻塞等待。
在获得资源的线程在执行完成后,则需要调用相应的release
方法释放持有的资源,而在排他模式下会唤醒一个后继的等待线程继续尝试获取资源。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在release
方法中,它首先会通过tryRelease
方法尝试释放资源(具体的释放语义则需要使用者自定义),如果资源被释放成功则唤醒后继节点(如条件符合)并返回true
,否则直接返回false
退出执行。
共享阻塞
如果要在共享模式下执行阻塞式获取资源则可以通过调用acquireShared
方法来达到目的,即在资源充足的情况下调用doAcquireShared
方法获得资源,并在条件符合下继续往后传播唤醒信号以让更多的等待线程可以获得资源,最终执行相应的逻辑;而如果资源不足的情况下调用doAcquireShared
方法同样会进入阻塞状态。
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
在acquireShared
方法中,它首先会通过tryAcquireShared
方法尝试获取资源(具体的获取语义则需要使用者自定义),如果资源获取成功则直接返回;而如果资源获取失败,则通过doAcquireShared
方法将线程构造为Node
节点插入到等待队列中进行阻塞等待。
在获得资源的线程在执行完成后,则需要调用相应的releaseShared
方法释放持有的资源,而在共享模式下会唤醒一个后继的等待线程继续尝试获取资源,并在条件符合下继续往后传播唤醒信号以让更多的等待线程可以获得资源,最终执行相应的逻辑。
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在releaseShared
方法中,它首先会通过tryReleaseShared
方法尝试释放资源(具体的释放语义则需要使用者自定义),如果资源被释放成功则唤醒后继节点(如条件符合,会一直往后传播唤醒信号)并返回true
,否则直接返回false
退出执行。
此处需要注意,虽然
acquireQueued
、doAcquireInterruptibly
和doAcquireShared
等方法的阻塞机制都是公平的,但是如果在实现同步器/锁时不加以注意则可能会得到一个非公平的同步器/锁。这是因为在每次调用acquire
/acquireShared
方法时首先都会先执行tryAcquire
/tryAcquireShared
方法,如果它执行成功则表示拿到了资源,而不用再陷入阻塞状态,即抢占了等待时间最长的线程。所以,如果需要实现公平的同步器/锁,则需要在tryAcquire
/tryAcquireShared
方法上做文章了。
条件队列
AQS
不但支持同步阻塞机制,而且还支持条件等待机制。其中,条件等待机制是通过条件变量condition variables
(也被称为条件队列condition queues
)来实现的,而为了与上文相呼应,下文统一用条件队列condition queues
来命名。
条件变量
condition variables
的概念来自于Monitor
,它表示一个与mutex
相关联的节点(线程)队列,队列中的每个节点(线程)都会在不占用所关联的mutex
的情况下进入到等待状态(以让其他节点(线程)可以获取到mutex
),直到某个条件成立为止。
Condition
接口
对于AQS
的条件队列condition queues
是基于Condition
接口来实现的,下面笔者从官方文档中整理出Condition
接口的几点关键点:
Condition
实例需要绑定一个Lock
,一般在通过Lcok#newCondition
方法创建Condition
实例时进行绑定。Condition
提供了一种方式让我们可以释放所关联的Lock
并暂停线程的执行,直到在条件符合的情况下被其他线程通知,即条件队列。
在
Java
中,Object#monitor
(即Object#wait()
与Object#notify()
)具有与Condition
类似的功能,只不过Object#monitor
需要与synchronized
联合使用,而Condition
则需要与Lock
联合使用。另外,最终实现的Condition
在行为上和在语义上不一定与Object#monitor
完全一样(例如顺序性保证、锁持有保证等),这依赖于具体的实现。
而在Condition
中主要定义了两类方法,分别是用于让线程进入等待状态的await
类方法和用于通知等待线程的signal
类方法。下面我们来看看Condition
是如何对它们进行定义的:
-
await
类方法void await() throws InterruptedException;
因为对于具有中断、非中断、超时等功能的
await
方法它们的核心流程基本上是相同的,所以在这里就不再展开了。对
await
的定义官方是这样描述的:执行await
将使得与Condition
相关联的Lock
自动地释放并让当前线程进入等待状态,直到线程被通知或被中断唤醒。其中,存在以下几种方法可以让等待线程被唤醒:- 其他线程执行
Condition#signal()
,并且当前线程刚好被选中唤醒 - 其他线程执行
Condition#signalAll()
,唤醒所有等待线程 - 其他线程执行
Thread#interrupt()
中断当前线程(如果是方法awaitUninterruptibly
则不会存这一条唤醒规则) - 当前线程等待超时被唤醒(如果方法有设置超时时间,例如
awaitNanos
、awaitUntil
) - 当前线程被虚假唤醒
需要注意的是,在等待线程被唤醒后需要成功获取
Condition
所关联的Lock
后才能从(使线程进入等待状态的)await
方法返回,否则线程将继续等待(保证await
返回时持有了Lock
) - 其他线程执行
-
signal
类方法void signal(); void signalAll();
在
await
的方法定义中,我们已经对signal
方法和signalAll
方法的作用进行了阐述,即signal
方法用于唤醒一个等待线程,而signalAll
方法则用于唤醒所有等待线程,但是无论是通过signal
方法还是signalAll
方法唤醒的线程都必须重新获取Lock
才能使线程从(使线程进入等待状态的)await
方法返回。
ConditionObject
实现
在了解完AQS
对条件队列的定义后,接下来我们一起来看看其具体的实现ConditionObject
。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() {}
}
}
ConditionObject
在实现条件队列时复用了等待队列的数据结构Node
,并通过额外的字段nextWaiter
对每个Node
节点进行链接,然后再通过ConditionObject
中的firstWaiter
和lastWaiter
链接分别指向其头节点与尾节点,最终形成条件队列。具体结构如下图所示:
条件队列是通过
nextWaiter
链接进行实现的单向链表,而等待队列则是通过next
链接和prev
链接实现的双向链表。
condition队列:
Node Node Node Node
+---------------+ +---------------+ +---------------+ +---------------+
| thread | | thread | | thread | | thread |
| waitStatus | | waitStatus | | waitStatus | | waitStatus |
| prev | | prev | | prev | | prev |
| next | | next | | next | | next |
| | | | | | | |
| | | | | | | |
| nextWaiter+--------->+ nextWaiter+---------->+ nextWaiter+--------->+ nextWaiter+--------->null
+-------+-------+ +---------------+ +---------------+ +--------+------+
^ ^
| |
firstWaiter+------+ +-------------+lastWaiter
在了解完ConditionObject
的数据结构后,我们再来看看它是如何实现在Condition
上定义的await
类方法和signal
类方法。
await
等待
在持有Lock
的情况下,执行await
方法将会导致线程进入条件等待状态,并且释放掉持有的Lock
。下面笔者将await
方法相关代码贴了出来:
使用
ConditionObject
的前提条件是需要持有Lock
,也就说只有在持有Lock
的情况下才能调用await
方法。
public class ConditionObject implements Condition, java.io Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 把当前节点(线程)链接到条件队列
Node node = addConditionWaiter();
// 2. 将当前节点(线程)持有的资源释放掉,并唤醒其后继节点(如有)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3. 自旋判断当前节点(线程)是否在等待队列
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 判断是否存在中断,如存在则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 在等待队列中对于当前节点(线程)执行节点出队操作
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 5. 将条件队列中的取消节点移除(执行清理操作)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 6. 对等待过程中发生的中断进行处理(重新标记中断/抛出中断异常)
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
对于await
方法的执行主要可以分为6
步:
- 把当前节点(线程)链接到条件队列
- 将当前节点(线程)持有的资源释放掉,并唤醒其后继节点(如有)
- 自旋判断当前节点(线程)是否在等待队列
- 如果是,则结束条件等待,执行第
4
步(如发生中断也会结束条件等待) - 如果不是,则继续执行条件等待,执行第
3
步(如被唤醒)
- 如果是,则结束条件等待,执行第
- 在等待队列中对于当前节点(线程)执行节点出队操作
- 如果出队成功,则返回并执行第
5
步(证明资源获取成功) - 如果出队失败,则阻塞等待(自旋+阻塞,详情可阅读上文),并执行第
4
步(如被唤醒)
- 如果出队成功,则返回并执行第
- 将条件队列中的取消节点移除(执行清理操作)
- 对等待过程中发生的中断进行处理(重新标记中断/抛出中断异常)
在基于
ConditionObject
的条件队列上只有以独占的方式持有资源才能执行await
操作(即将节点从等待队列转移到条件队列),所以我们在await
方法中并没有看到很多类似于CAS
的操作。
虽然说在await
方法中将整个执行流程划分为6
个步骤,但其中条件阻塞的核心流程就只有3
步,分别是“将当前节点链接到条件队列”、“将当前节点从等待队列中移除”和“在条件队列中对当前节点执行阻塞操作”。
-
把当前节点(线程)链接到条件队列
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 将节点构造且加入到条件队列 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
在
addConditionWaiter
方法中,首先会获取正常的队尾节点(如队尾节点被取消,则进行清理操作),然后再将当前线程构造为Node
节点链接到条件队列的队尾(此处将节点状态设置为CONDITION
状态,表示处于条件队列中)。此处需要注意的是,因为条件队列的操作都是在以独占方式持有资源的情况下进行的,所以队列中的节点并不存在临界状态。也就是说在条件队列中节点的
waitStatus
只有两种情况,一是CONDITION
状态,表示在条件队列呆着;二是CANCELLED
状态,表示被取消了。因此在addConditionWaiter
方法中遇到非CONDITION
状态的节点直接当它是取消节点进行清理。其中,当发现队尾节点是
CANCELLED
状态时(即节点被取消),则执行unlinkCancelledWaiters
方法对取消节点进行清理:/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ private void unlinkCancelledWaiters() { Node t = firstWaiter; // trail用于记录被取消节点的前驱节点 Node trail = null; while (t != null) { // 获取当前节点的后继节点 Node next = t.nextWaiter; // 如果当前节点被取消了 if (t.waitStatus != Node.CONDITION) { // 清除与后继节点的链接 t.nextWaiter = null; // 如果当前节点是头节点,直接用firstWaiter指向被取消节点的后继节点(相当于丢掉当前取消节点) if (trail == null) firstWaiter = next; // 否则将前驱节点指向后继节点(相当于丢掉当前取消节点) else trail.nextWaiter = next; // 如果后继节点为null,直接用lastWaiter指向前驱节点(相当于丢掉当前取消节点) if (next == null) lastWaiter = trail; } else // 记录被取消节点的前驱节点 trail = t; // 继续遍历下一个节点 t = next; } }
关于
unlinkCancelledWaiters
方法的大致思路与从单向链表中删除节点并无差异,即从头节点开始逐个遍历至尾节点,期间遇到被取消的节点就进行清理操作(移除)。具体笔者已经在代码关键位置标注了注释,在这里就不再展开了。 -
将当前节点(线程)持有的资源释放掉,并唤醒其后继节点
在将线程构造为
Node
节点链接到条件队列后,接着就是对等待队列中持有资源的节点执行释放操作。注意,在独占模式下持有资源的节点是(等待队列的)head
节点。/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; try { // 获取state值 int savedState = getState(); // 释放state if (release(savedState)) { failed = false; // 成功则返回,外层保存在起来,之后唤醒获取时会重新传入state return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
在
fullyRelease
方法中会通过调用release
方法释放资源,并唤醒后继节点将当前head
节点踢出队列。(具体release
方法的执行逻辑可阅读上文)对于传统
Condition
的定义,在通过条件队列进行阻塞后是会释放持有的资源(以让其他线程可以获取资源)。当然这取决于我们最终的实现,如果我们在实现中将这部分特性去掉,则必须在相应的使用文档上特别标明。 -
将当前节点(线程)陷入阻塞,直到被唤醒
在将节点从等待队列转移到条件队列后,接着就是将当前节点(线程)陷入阻塞状态了。这一步主要是基于
isOnSyncQueue
方法的判断逻辑来实现的,即在不符合此条件的情况下会执行LockSupport#park
使当前节点(线程)进入阻塞状态。/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { // 1. 判断当前节点状态是否为CONDITION,如果是则证明存在于条件队列,返回false if (node.waitStatus == Node.CONDITION) return false; // 2. 判断当前节点的前驱节点(prev链接)是否为null,如果是则证明并不存在于等待队列,返回false(waitStatus!=CONDITION) if (node.prev == null) return false; // 3. 判断当前节点的后继节点是否为null,如果是则证明存在于等待队列,返回true(waitStatus!=CONDITION && node.prev!=null) if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 4. 判断当前节点是否存在于等待队列(通过从尾节点向前遍历),如果是则返回true,否则返回false return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
在
isOnSyncQueue
方法中会通过各种边界条件判断当前节点是否存在于等待队列,其中判断条件可分为4
点:- 判断当前节点状态是否为
CONDITION
,如果是则证明存在于条件队列,返回false
因为条件队列中节点的
waitStatus
都为CONDITION
,所以此条件可以证明节点不存在于等待队列 - 判断当前节点的前驱节点(
prev
链接)是否为null
,如果是则证明并不存在于等待队列,返回false
因为条件等待的线程会存在虚假唤醒的情况,如果此时刚好执行
signal
方法(唤醒),并执行到如下临界代码处:boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // o <== 执行临界代码处 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
不难看出此时在
waitStatus
不等于CONDITION
的情况下节点还是处于条件队列。而对于这种情况,AQS
就通过对当前节点的前驱节点(prev
链接)是否为null
来判断其是否处于等待队列。(在等待队列中除了head
节点外其他节点都存在前驱节点,即使等待队列为空的情况它也会创建一个虚拟head
节点来作为其前驱节点,具体可阅读上文相关部分) - 判断当前节点的后继节点是否为
null
,如果是则证明存在于等待队列,返回true
因为对于等待队列的插入(如通过
enq
方法)是先对prev
链接进行设置,然后在CAS
成功后(保证了插入原子性)再对next
链接进行设置。即,在prev
链接不为null
时并不代表节点就已经插入到等待队列(存在CAS
失败的情况),而在next
链接也不为null
时才可以证明节点已经插入到等待队列中,具体可阅读上文相关部分 - 判断当前节点是否存在于等待队列(通过从尾节点向前遍历),如果是则返回
true
,否则返回false
如果上述条件都不满足则无法判断节点是否在等待队列,此时需要从等待队列的尾节点逐个向前遍历判断当前节点是否存在于队列中
- 判断当前节点状态是否为
signal
唤醒
在持有Lock
的情况下,执行signal
方法将会使(条件)等待时间最长的节点被唤醒,而执行signalAll
方法则会将所有处于条件等待的节点唤醒。
使用
ConditionObject
的前提条件是需要持有Lock
,也就说只有在持有Lock
的情况下才能调用signal
方法或signalAll
方法。
public class ConditionObject implements Condition, java.io Serializable {
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取第一个节点
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
}
此处通过
isHeldExclusively
方法来判断当前线程是否独占地持有资源,如并没有持有则抛出异常IllegalMonitorStateException
。
在signal
和signalAll
方法中,它们会将具体唤醒逻辑委托给了doSignal
方法和doSignalAll
方法(在独占式持有资源并且在条件队列中存在节点的情况下)。
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
// 将头节点开始向后逐个遍历
do {
// 将firstWaiter链接指向其后继节点
if ( (firstWaiter = first.nextWaiter) == null)
// 如果条件队列只有一个节点,则将lastWaiter链接也设置为null(firstWaiter链接已经被设置为null)
lastWaiter = null;
// 将当前节点的nextWaiter链接设置为null,表示将其踢出条件队列
first.nextWaiter = null;
} while (!transferForSignal(first) // 唤醒条件等待的节点,并判断操作是否成功,如果成功则直接退出循环(唤醒等待时间最长的节点),否则继续执行循环
&& (first = firstWaiter) != null); // 将first指向后一个节点,让其继续执行循环(直到队尾)
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
// 将firstWaiter链接和lastWaiter链接设置为null
lastWaiter = firstWaiter = null;
// 将头节点开始向后逐个遍历
do {
// 获取当前节点的后继节点
Node next = first.nextWaiter;
// 将当前节点的nextWaiter链接设置为null,表示将其踢出条件队列
first.nextWaiter = null;
// 唤醒条件等待的节点
transferForSignal(first);
// 将first指向后一个节点,让其继续执行循环(直到队尾)
first = next;
} while (first != null);
}
而在doSignal
和doSignalAll
方法中,它们则会从传入first
节点(条件等待队列中的头节点)开始向后逐个遍历并通过调用transferForSignal
方法唤醒等待中的节点,其中对于唤醒等待时间最长的节点还是唤醒所有处于条件等待的节点则取决于两者所定义的语义了。这里,我们再来看一下真正执行唤醒操作的transferForSignal
方法是如何实现的:
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 先将节点状态从CONDITION转为0,如果节点被取消则此处会执行失败,即返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将node节点插入到等待队列
Node p = enq(node);
int ws = p.waitStatus;
// 如果当前节点被取消或者其前驱节点状态变更失败,则将当前node节点(线程)唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
对于transferForSignal
方法主要分为3
个步骤:
- 将当前节点的状态从
CONDITION
变更为0
,如果失败直接返回false
(表示当前节点被取消了) - 将当前节点通过
enq
方法插入到等待队列(外层代码已经将节点从条件队列中出队,所以来到这里相对来说已经转移成功了) - 如果当前节点被取消或者其前驱节点状态变更失败(变更为
SIGNAL
),则通过LockSupport#unpark
方法将当前节点(线程)唤醒
对于等待队列的插入,我们是需要将其前驱节点设置为SIGNAL
状态才算真正完成(以表示其存在后继节点需要被唤醒,具体可阅读上文相关部分)。因此在第3
步中如果成功地将其前驱节点设置为SIGNAL
状态则表示已经成功地将节点从条件队列转移到等待队列,而如果其前驱节点被取消或者SIGNAL
状态设置失败则还需要通过LockSupport#unpark
方法将当前节点(线程)唤醒并让它完成对异常地修正(跳过被取消节点或者设置前驱节点的状态)。最终回到await
方法的第3
步继续执行:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//...
// 3. 自旋判断当前节点(线程)是否在等待队列
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// ...
}
// 4. 在等待队列中对于当前节点(线程)执行节点出队操作
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// ...
}
除了在调用signal
和signalAll
方法将节点转移回等待队列出现异常时会触发await
方法让其继续执行第3
步判断(唤醒)外,节点在正常从条件队列转移回等待队列后经过前驱节点(head
节点)执行完毕唤醒其后继节点的情况也同样会触发await
方法第3
步继续执行(唤醒)。其中,两种同样会跳出第3
步循环(因为已经转移回等待队列),并执行acquireQueued
方法进行修正、等待和获取(具体逻辑可阅读上文相关部分)。
总的来说,对于
await
方法的执行本质上是将节点从等待队列转移到条件队列,而对于signal
方法的执行则本质是将节点从条件队列转移到等待队列,其中对于节点处于哪一个队列则是通过Node
节点的waitStatus
状态进行判断。
总结
本文由浅入深对AQS
进行了解读,其中主要包括其数据结构、等待队列和条件队列等。然而虽然AQS
从原理上看是十分精细并复杂,但是作为同步/阻塞的框架使用起来所需要做的事情其实并不多,下面笔者将从使用者的角度列举出AQS
的用法。
语义实现
在AQS
的实现中,我们的首要任务是对资源获取语义进行实现,具体所需要实现的方法如下表所示:
方法 | 描述 |
---|---|
tryAcquire |
表示在排他模式(EXCLUSIVE )下去获取资源,如果返回true 表示获取成功,否则表示获取失败。其中,在方法的实现中我们应该判断当前是否能在独占模式获取资源。 |
tryRelease |
表示在排他模式(EXCLUSIVE )下去释放资源,如果返回true 表示全部释放成功,否则表示释放失败或者部分释放。 |
tryAcquireShared |
表示在共享模式(SHARED )下去去获取资源,如果返回大于0 表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0 表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0 则表示获取失败。其中,在方法的实现中我们应该判断当前是否能够在共享模式下获取资源。 |
tryReleaseShared |
表示在共享模式(SHARED )下去释放资源,如果返回true 表示释放成功,否则表示释放失败。 |
isHeldExclusively |
表示资源是否被独占地持有,如果返回true 表示被独占持有,否则表示没有被独占持有。 |
对于
AQS
条件等待机制的使用,我们是需要实现isHeldExclusively
方法的,因为会存在类似于signal
和signalAll
这样的方法运用到它。而如果我们并没有运用到Condition
,在实现类中是可以不实现isHeldExclusively
方法的(如其他地方也没有运用到)。
以上就是我们在使用AQS
时所需要完成的所有工作了,至于等待队列或条件队列相关的实现细节我们是无需关心的,因为这是AQS
自身的工作。
方法使用
而在我们对上述语义进行实现的过程中,或者在使用AQS
时需要对其进行一定的监控时,我们可以使用AQS
提供给我们的一些方法来更快速地完成工作,下面笔者将相关部分分点列出:
-
用于操作
state
的方法方法 描述 getState
获取 state
的值setState
设置 state
的值compareAndSetState
通过 CAS
修改state
的值这些方法都是
protected
和final
的,主要(只能)用于AQS
的实现类中,并且不能被继承修改。一般用于实现获取/释放资源的语义。 -
用于操作执行线程的方法
方法 描述 getExclusiveOwnerThread
获取当前执行线程 setExclusiveOwnerThread
设置当前执行线程 这些方法主要用于实现排他模式(
EXCLUSIVE
),并且可以通过比较当前线程是否与保存中的线程相同来实现可重入。 -
用于获取/释放资源的方法
方法 描述 tryAcquire
表示在排他模式( EXCLUSIVE
)下去获取资源,如果返回true
表示获取成功,否则表示获取失败。tryRelease
表示在排他模式( EXCLUSIVE
)下去释放资源,如果返回true
表示全部释放成功,否则表示释放失败或者部分释放。tryAcquireShared
表示在共享模式( SHARED
)下去去获取资源,如果返回大于0
表示获取成功并且其后继节点也可能成功获取资源;如果返回等于0
表示获取成功但其后继节点不能再成功获取资源了;如果返回小于0
则表示获取失败。tryReleaseShared
表示在共享模式( SHARED
)下去释放资源,如果返回true
表示释放成功,否则表示释放失败。isHeldExclusively
表示资源是否被独占地持有,如果返回 true
表示被独占持有,否则表示没有被独占持有。acquire
表示在排他模式( EXCLUSIVE
)下去获取资源,如果获取失败会陷入阻塞(进入等待队列)直到获取成功。acquireInterruptibly
表示在排他模式( EXCLUSIVE
)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常。tryAcquireNanos
表示在排他模式( EXCLUSIVE
)下在规定时间内去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true
,超时则返回false
。release
表示在排他模式( EXCLUSIVE
)下去释放资源,如果释放成功返回true
,否则返回false
。acquireShared
表示在共享模式( SHARED
)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功(与排他模式相比,此方法可以让多个线程同时获取到资源)。acquireSharedInterruptibly
表示在共享模式( SHARED
)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常(与排他模式相比,此方法可以让多个线程同时获取到资源)。tryAcquireSharedNanos
表示在共享模式( SHARED
)下去获取资源,获取失败会陷入阻塞(进入等待队列)直到获取成功或中断抛出异常,其中如果在规定时间内获取成功会返回true
,超时则返回false
。(与排他模式相比,此方法可以让多个线程同时获取到资源)。releaseShared
表示在共享模式( SHARED
)下去释放资源,如果释放成功返回true
,否则返回false
。这些方法都是
public
的,主要用于对AQS
实现类(如ReentrantLock
)的使用,如获取/释放资源的操作。 -
用于查看等待队列的方法
方法 描述 hasQueuedThreads
查看等待队列当前是否存在等待节点(线程)。需要注意的是由于 head
节点并非正在等待的节点,所以并不算在内。另外由于中断和超时导致节点被取消在任何时候都有可能发生,所以这里并不保证值是正确的。hasContended
查看等待队列是否曾经存在过等待节点(线程),即是否曾经发生过竞争。 getFirstQueuedThread
查看等待队列中目前正在等待且等待时间最长的节点(线程),如果不存在返回 null
。需要注意的是由于head
节点并非正在等待的节点,所以并不算在内。isQueue
查看当前入参节点(线程)是否存在等待队列中。 hasQueuedPredecessors
查看等待队列中当前是否存在比当前节点(线程)等待时间长的等待节点(线程),即是否存在等待节点(线程)位于当前节点(线程)的前面。需要注意的是由于 head
节点并非正在等待的节点,所以并不算在内。另外由于中断和超时导致节点被取消在任何时候都有可能发生,所以这里并不保证值是正确的。这些方法都是
public
的,主要用于在AQS
实现类中判断是否能获取/释放资源(具体需看语义),或者用于对AQS
条件队列的监控。另外,
AQS
中还存在一个非public
的方法apparentlyFirstQueuedIsExclusive
(默认修饰符),用于查看等待队列中第一个正在等待的节点(线程)是否为排他模式,此方法目前仅仅应用于ReentrantReadWriteLock
。其中,使用默认修饰符的原因也是因为它仅被ReentrantReadWriteLock
使用(ReentrantReadWriteLock
与AQS
位于同一个包下)。 -
用于监控等待队列的方法
方法 描述 getQueueLength
用于获取等待队列的长度(预估值)。 getQueuedThreads
用于获取等待队列中的节点(线程)列表(预估值)。 getExclusiveQueuedThreads
用于获取等待队列中处于排他模式( EXCLUSIVE
)的节点(线程)列表(预估值)。getSharedQueuedThreads
用于获取等待队列中处于共享模式( SHARED
)的节点(线程)列表(预估值)。这些方法是
public
的,一般用于对AQS
等待队列使用情况的监控。其中,方法的返回值都是一个预估值,这是因为等待队列是基于链表实现的,对于上述方法的结果都需要逐个遍历,而在遍历的过程中节点是可以动态改变的,所以最终得到的只是一个预估值。 -
用于监控条件队列的方法
方法 描述 owns
用于判断传入的 Condition
(条件队列)是否用于当前AQS
,即是否通过当前AQS
所创建的。hasWaiters
用于判断传入 Condition
(条件队列)中是否存在正在等待的节点(线程)(预估值),另外如果传入Condition
(条件队列)不属于当前AQS
则抛出异常。getWaitQueueLength
用于查看传入 Condition
(条件队列)中当前等待节点(线程)的数量(预估值),另外如果传入Condition
(条件队列)不属于当前AQS
则抛出异常。getWaitingThreads
用于查看传入 Condition
(条件队列)中当前等待节点(线程)的集合(预估值),另外如果传入Condition
(条件队列)不属于当前AQS
则抛出异常。这些方法是
public
的,一般用于对AQS
条件队列使用情况的监控。其中,方法的返回值都是一个预估值,这是因为条件队列是基于链表实现的,对于上述方法的结果都需要逐个遍历,而在遍历的过程中节点是可以动态改变的,所以最终得到的只是一个预估值。
实现例子
最终,在这里笔者贴出了官方文档给出的AQS
实现例子,我们可以结合上下文来阅读以加深对AQS
的理解。
/*
* <p>Here is a non-reentrant mutual exclusion lock class that uses
* the value zero to represent the unlocked state, and one to
* represent the locked state. While a non-reentrant lock
* does not strictly require recording of the current owner
* thread, this class does so anyway to make usage easier to monitor.
* It also supports conditions and exposes
* one of the instrumentation methods:
*/
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
/*
* <p>Here is a latch class that is like a
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
* except that it only requires a single {@code signal} to
* fire. Because a latch is non-exclusive, it uses the {@code shared}
* acquire and release methods.
*/
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() { return getState() != 0; }
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() { return sync.isSignalled(); }
public void signal() { sync.releaseShared(1); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
在例子中分别实现了排他模式(EXCLUSIVE
)和共享模式(SHARED
),其中具体的实现细节可在上文相应的部分找到答案,在这里就不再细讲了。但也许你会注意到,在例子中AQS
的实现类都是担任着内部类,然后再委托给外部类去调用,这一点官方文档也有所提及,这里笔者把相关注释说明贴出来:
/*
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. Class
* {@code AbstractQueuedSynchronizer} does not implement any
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
* implement their public methods.
*/
即,因为在AQS
中并没有实现任何锁/同步器的接口(并不符合使用规范),所以我们在使用过程中应该通过内部类委托的方式来实现。
接口具有封装性的作用,对于大部分调用者来说他们只认相关的锁/同步器的接口,如果直接使用
AQS
的话会对调用者很不友好。
参考
- CSDN《AQS源码深入分析之共享模式》
- 博客园《CLH锁 、MCS锁》
- 博客园《AbstractQueuedSynchronizer源码解读 》
- Stackoverflow《Why the parameter of
h
will judge...》 - Wiki《Monitor (synchronization)》
未经本人许可,禁止转载