之前的文章中,简单的介绍了
ReentrantLock
锁。那么这里我就要进行里面的方法以及属性介绍啦(此文章基于里面的非公平锁进行说明)!!!
ReentrantLock
ReentrantLock 特性概览
ReentrantLock
意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。这里就对ReentrantLock
跟常用的Synchronized
进行比较。
ReentrantLock | Synchronized | |
---|---|---|
锁实现机制 | 依赖AQS | 监视器模式 |
灵活性 | 支持响应中断、超时、尝试获取锁 | 不灵活 |
释放形式 | 必须显示调用unlock()进行解锁 | 自动释放监视器 |
锁类型 | 必须显示调用unlock()进行解锁 | 自动释放监视器 |
条件队列 | 可关联多个条件队列 | 关联一个条件队列 |
可重入性 | 可重入 | 可重入 |
ReentrantLock 与 AQS 的关联
final void lock() {
if (compareAndSetState(0, 1)) // 设置同步状态
setExclusiveOwnerThread(Thread.currentThread());//当前线程设置为独占线程。
else
acquire(1);// 设置失败,进入acquire 方法进行后续处理。
}
复制代码
上面的代码就是非公平锁加锁的方法。主要是做了两点:
- 若通过 CAS 设置变量 State(同步状态)成功,也就是获取锁成功,则将当前 线程设置为独占线程。
- 若通过 CAS 设置变量 State(同步状态)失败,也就是获取锁失败,则进入 Acquire 方法进行后续处理。
如果设置同步状态失败,则会进入到对应的acquire()
方法中去进行加锁处理。而acquire()
无论是非公平锁或公平锁,最后调用的都是父类中的方法。
AQS(AbstractQueuedSynchronizer)
先通过下面的架构图来整体了解一下
AQS
框架:
- 图中有颜色的为
Method
,无颜色的为Attribution
。 - 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关 注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第 一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获 取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依 赖于第五层的基础数据提供层。
AQS
原理概览:
如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
AQS 使用一个CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是CLH变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
Volatile
的
int
类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对
State
值的修改。
AQS
数据结构:
AQS
中最基本的数据结构是-节点。内含方法如下:
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
thread | 表示处于该节点的线程 |
prev | 前驱指针 |
predecessor | 返回前驱节点,没有的话抛出 NPE |
nextWaiter | 指向下一个处于 CONDITION 状态的节点(由于本篇文章不讲述 ConditionQueue 队列,这个指针不多介绍) |
next | 后继指针 |
线程两种锁的模式:
模式 | 含义 |
---|---|
SHARED | 表示线程以共享的模式等待锁 |
EXCLUSIVE | 表示线程正在以独占的方式等待锁 |
waitStatus 有下面几个枚举值:
枚举 | 含义 |
---|---|
0 | 当一个 Node 被初始化的时候的默认值 |
CANCELLED | 为 1,表示线程获取锁的请求已经取消了 |
CONDITION | 为 -2,表示节点在等待队列中,节点线程等待唤醒 |
PROPAGATE | 为 -3,当前线程处在 SHARED 情况下,该字段才会使用 |
SIGNAL | 为 -1,表示线程已经准备好了,就等资源释放了 |
AQS
中的同步状态:
AQS
中维护了一个名为 state
的字段,意为同步状态,是由 Volatile
修饰的,用于展示当前临界 资源的获锁情况。
/**
* The synchronization state.
*/
private volatile int state;
复制代码
独占模式情况下:
共享模式情况下:AQS 重要方法与 ReentrantLock 的关联
下面列举了自定义同步器需要实现以下方法,一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现
tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中 的 一 种 即可。AQS
也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。ReentrantLock
是独占锁,所以实现了tryAcquire-tryRelease
。
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 True,失败则返回 False。 |
protected boolean tryRelease(int arg) | 独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 True,失败则返回 False。 |
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 True,否则返回 False。 |
下图举例说明 非公平锁与AQS之间方法的关联之处:
加锁和解锁的交互流程:加锁:
- 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
- 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方 法,本质上都会执行 AQS 的 Acquire 方法。
- AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自 定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
- tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。
解锁:
- 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
- Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
- Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现, tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的 过程,并不区分是否为公平锁。
- 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。
从上面的描述,大概可以总结出 ReentrantLock 加锁解锁时 API 层核心方法的映射关系:
通过ReentrantLock理解 AQS
从上面的简单分析,我们知道如果当前线程没有获取到锁的话,则会进入到等待队列中去,我们接下来看看线程是何时以及怎样被加入进等待队列中的。
线程加入等待队列
当执行 Acquire(1) 时,会通过 tryAcquire 获取锁。在这种情况下,如果获取锁 失败,就会调用 addWaiter 加入到等待队列中去。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
主要的流程如下:
- 通过当前的线程和锁模式新建一个节点。
- Pred 指针指向尾节点 Tail。
- 将 New 中 Node 的 Prev 指针指向 Pred。
- 通过 compareAndSetTail 方法,完成尾节点的设置。这个方法主要是对 tailOffset 和 Expect 进行比较,如果 tailOffset 的 Node 和 Expect 的 Node 地址是相同的,那么设置 Tail 的值为 Update 的值(利用的是
CAS
)。 - 如果 Pred 指针是 Null(说明等待队列中没有元素),或者当前 Pred 指针和 Tail 指向的位置不同(说明被别的线程已经修改),就需要看一下 Enq 的方法。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头 结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或 者并发导致队列中有元素,则与之前的方法相同。其实,
addWaiter
就是一个在双 端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数 的头结点。
总结下线程获取锁的步骤:
- 当没有线程获取到锁时,线程 1 获取锁成功。
- 线程 2 申请锁,但是锁被线程 1 占有。
- 如果再有线程要获取锁,依次在队列中往后排队即可。
回到上边的代码,
hasQueuedPredecessors
是公平锁加锁时判断等待队列中 是否存在有效节点的方法。如果返回False
,说明当前线程可以争取共享资源;如果 返回True
,说明队列中存在有效节点,当前线程必须加入到等待队列中。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
复制代码
看到这里,我们理解一下 h != t && ((s = h.next) == null || s.thread != Thread. currentThread());
其实在双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真 正的第一个有数据的节点,是在第二个节点开始的。当 h != t 时:如果 (s =h.next) == null,等待队列正在有线程进行初始化,但只是进行到了
Tail
指 向Head
,没有将Head
指向Tail
,此时队列中有元素,需要返回True
(这块 具体见下边代码分析)。 如果 (s = h.next) != null,说明此时队列中至少有一 个有效节点。如果此时 s.thread == Thread.currentThread(),说明等待队 列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源 的;如果 s.thread != Thread.currentThread(),说明等待队列的第一个有效 节点线程与当前线程不同,当前线程必须加入进等待队列。
1 if (t == null) { // Must initialize
2 if (compareAndSetHead(new Node()))
3 tail = head;
4 } else {
5 node.prev = t;
6 if (compareAndSetTail(t, node)) {
7 t.next = node;
8 return t;
9 }
10 }
复制代码
节点入队不是原子操作,所以会出现短暂的 head != tail,此时 Tail 指向最后 一个节点,而且 Tail 指向 Head。如果 Head 没有指向 Tail(可见 5、6、7 行), 这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的 并发问题。
等待队列中线程出队列时机
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
上文解释了 addWaiter
方法,这个方法其实就是把对应的线程以 Node
的数据 结构形式加入到双端队列里,返回的是一个包含该线程的 Node
。而这个Node
会作为参数,进入到 acquireQueued
方法中。acquireQueued
方法可以对排队中的线 程进行“获锁”操作。 总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued
会把放 入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。 下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下 acquireQueued
源码:
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前 node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明 p 为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是 p不为头结点,这个时候就要判断当前 node 是否要被阻塞(被阻塞条件:前驱节点的waitStatus 为 -1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
注:setHead 方法是把当前节点置为虚节点,但并没有修改 waitStatus
,因为 它是一直需要用的数据。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
// 通过枚举值我们知道 waitStatus>0 是取消状态
if (ws > 0) {
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前任节点等待状态为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
复制代码
parkAndCheckInterrupt 主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
上述方法的流程图如下:
从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获 取锁成功”。为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态 来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire 流程):从队列中释放节点的疑虑打消了,那么又有新问题了:
- shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的 waitStatus 设置为-1 ?
- 是在什么时间释放节点通知到被挂起的线程呢?
### CANCELLED 状态节点生成
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
通过 cancelAcquire 方法,将 Node 的状态标记为 CANCELLED
。接下来, 我们逐行来分析这个方法的原理:
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的 node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前 node 的状态设置为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入 else,如果更新成功,将 tail 的后继节点设置为 null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果当前节点不是 head 的后继节点,
// 1: 判断当前节点前驱节点的是否为 SIGNAL,
// 2: 如果不是,则把前驱节点设置为 SINGAL 看是否成功
// 如果 1 和 2 中有一个为 true,再判断当前节点的线程是否为 null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
int ws;
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是 head 的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
复制代码
当前流程:
- 获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直 往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 Pred 节点和当前 Node 关联,将当前 Node 设置为 CANCELLED。
根据当前节点的位置,考虑以下三种情况:
1. 当前节点是尾节点。
2. 当前节点是 Head 的后继节点。
3. 当前节点不是 Head 的后继节点,也不是尾节点。
复制代码
根据上述第二条,我们来分析每一种情况的流程。
当前节点是尾节点:
当前节点是 Head 的后继节点: 当前节点不是 Head 的后继节点,也不是尾节点:通过上面的流程,我们对于 CANCELLED
节点状态的产生和变化已经有了大致 的了解,但是为什么所有的变化都是对 Next
指针进行了操作,而没有对 Prev 指针 进行操作呢?什么情况下会对 Prev
指针进行操作?
- 执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了 (已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改
Prev
指针,有可能会导致Prev
指向另一个已经移除队列的Node
, 因此这块变化Prev
指针不安全。 shouldParkAfterFailedAcquire 方法中, 会执行下面的代码,其实就是在处理Prev
指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已 被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev
指针 比较安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
复制代码
如何解锁
由于 ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:
public void unlock() {
sync.release(1);
}
复制代码
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 头结点不为空并且头结点的 waitStatus 不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
复制代码
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为 null,并更新 state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
复制代码
这里的判断条件为什么是 h != null && h.waitStatus != 0 ? h == null 则说明Head
还没初始化。初始情况下,head == null,第一个节点入队,Head
会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出 现 head == null 的情况。 h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。 h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
private void unparkSuccessor(Node node) {
// 获取头结点 waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是 null 或者下个节点被 cancelled,就找到队列最开始的非cancelled 的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个 waitStatus<0 的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态 <=0,就把当前节点 unpark
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
为什么要从后往前找第一个非 Cancelled
的节点呢?原因如下。 之前的 addWaiter 方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
从这里可以看到节点入队并不是原子操作,也就是说,node.prev = pred;compareAndSetTail(pred, node) 这两个地方可以看作
Tail
入队的原子操作, 但是此时 pred.next = node; 还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED
状态节点的时候,先断开的是Next
指针,Prev
指针并未断开,因此 也是必须要从后往前遍历才能够遍历完全部的Node
。 综 上 所 述, 如 果 是 从 前 往 后 找, 由 于 极 端 情 况 下 入 队 的 非 原 子 操 作 和CANCELLED
节点产生过程中断开Next
指针的操作,可能会导致无法遍历所 有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。继续执行acquireQueued 方法以后,中断如何处理?
中断恢复后的执行流程
唤醒后,会执行 return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
再 回 到 acquireQueued 代 码, 当 parkAndCheckInterrupt 返 回True
或者 False
的时候,interrupted
的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前 interrupted
返回。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
如果 acquireQueued 为 True
,就会执行 selfInterrupt 方法。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
复制代码
该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属 于 Java 提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:
- 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过 Thread.interrupted() 方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断 标识设置为 False),并记录下来,如果发现该线程被中断过,就再中断一次。
- 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。 最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。 这里的处理方式主要是运用线程池中基本运作单元 Worder 中的
runWorker
, 通过 Thread.interrupted() 进行额外的判断处理,感兴趣的同学可以看下 ThreadPoolExecutor 源码。
小结
Q:某个线程获取锁失败的后续流程是什么呢?
A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
A:是 CLH 变体的 FIFO 双端队列。
Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
A:可以详细看下上面的 ==> 等待队列中线程出队列时机
Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问?
A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放,具体可看上文的 ==>CANCELLED状态节点生成。
Q:Lock 函数通过 Acquire 方法进行加锁,但是具体是如何加锁的呢?
A:AQS 的 Acquire 会调用 tryAcquire 方法,tryAcquire 由各个自定义同步器实现,通过 tryAcquire 完成加锁过程。
AQS 应用
ReentrantLock 的可重入应用
ReentrantLock 的可重入性是 AQS
很好的应用之一,在了解完上述知识点以后,我们得知ReentrantLock实现可重入的方法。在 ReentrantLock 里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
复制代码
非公平锁:
if (c == 0) {
if (compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
复制代码
从上面这两段都可以看到,有一个同步状态 State
来控制整体可重入的情况。State
是Volatile
修饰的,用于保证一定的可见性和有序性。
接下来看 State 这个字段主要的过程:
- State 初始化的时候为 0,表示没有任何线程持有锁。
- 当有线程持有该锁时,值就会在原来的基础上 +1,同一个线程多次获得锁是,就会多次 +1,这里就是可重入的概念。
- 解锁也是对这个字段 -1,一直到 0,此线程对锁释放。
JUC中的应用场景
除了上边 ReentrantLock 的可重入性的应用,AQS
作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了 JUC 中的几种同步工具,大体介绍一下 AQS
的应用场景:
同步工具 | 同步工具与AQS的关联 |
---|---|
ReentrantLock | 使用 AQS 保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
Semaphore | 使用 AQS 同步状态来保存信号量的当前计数。tryRelease 会增加计数,acquireShared 会减少计数。 |
CountDownLatch | 使用 AQS 同步状态来表示计数。计数为 0 时,所有的 Acquire 操作(CountDownLatch 的 await 方法)才可以通过。 |
ReentrantReadWriteLock | 使用 AQS 同步状态中的 16 位保存写锁持有的次数,剩下的 16 位用于保存读锁的持有次数。 |
ThreadPoolExecutor | Worker 利用 AQS 同步状态实现对独占线程变量的设置(tryAcquire 和tryRelease)。 |
总结
我们日常开发中使用并发的场景太多,但是对并发内部的基本框架原理了解的人却不多。而且多线程情况下,寻找问题所在也是一个很头大的问题。只有夯实基础,才能走的更远。
参考文章: 美团后台篇