本文已参与「新人创作礼」活动,一起开启掘金创作之路。
ReentrantLock在JDK1.5引入,是一个可以设置公平性的可重入锁。之前30:ReentrantLock的简单使用简单介绍了ReentrantLock的基础使用,下面我们从源码的角度来看ReentrantLock的关键功能是怎么实现的。
1 继承关系
ReentrantLock implements Lock, java.io.Serializable
复制代码
ReentrantLock实现了Lock和Serializable接口,Lock接口定义了锁应该拥有的基本功能,Serializable接口意味着ReentrantLock可以进行序列化。
2 构造方法
ReentrantLock有两个构造方法,分别为无参构造和指定锁公平性的构造。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
复制代码
可以通过参数fair设置公平性, true为公平锁,false为非公平锁 。而ReentrantLock()等价于ReentrantLock(false),即为非公平锁。公平锁模式设置sync为new FairSync() ,非公平锁模式设置sync为new NonfairSync()。查看源码可以发现,ReentrantLock只是一个壳子,锁的功能就是通过sync来实现的,所以下面分别看公平锁FairSync和非公平锁NonfairSync都是如何实现的。
3 NonfairSync与FairSync
3.1 类定义
3.1.1 NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
复制代码
3.1.2 FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
3.2 继承关系
NonfairSync extends Sync
Sync extends AbstractQueuedSynchronizer
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
FairSyncextends Sync
Sync extends AbstractQueuedSynchronizer
class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
复制代码
可以看到NonfairSync和FairSync都直接继承了Sync,间接继承了AbstractQueuedSynchronizer和AbstractOwnableSynchronizer。
3.3 关键属性
NonfairSync和FairSync继承关系相同,因此他们的关键属性也是相通的:
- private volatile int state; : 定义在AbstractQueuedSynchronizer,记录加锁次数,也可以说是重入次数,为0时表示当前处于无锁状态。
- private transient Thread exclusiveOwnerThread; :定义在AbstractOwnableSynchronizer,记录当前持有锁的线程。
- private transient volatile Node head;:定义在AbstractQueuedSynchronizer,等待队列的头,懒初始化的方式。初始化之外,它只能通过方法setHead修改。注意head并不是一个有效的节点,他在初始化时间是一个空节点,后续有效节点成为head时也会清空thread属性和prev属性变成一个空节点。也就是说当前等待队列的有效首位是head.next。这么做的好处是即使当前无线程等待,head也不会成功null,而是一个空节点,在新线程阻塞时就无需重复初始化,直接往后挂就行。因为存在等待节点清除逻辑,要保证head的waitStatus不为CANCELLED。
- private transient volatile Node tail;:定义在AbstractQueuedSynchronizer,等待队列的尾部。懒初始化的方式。
- volatile int waitStatus;:定义在AbstractQueuedSynchronizer$Node,表示等待队列节点的状态。其拥有5个值。①
static final int CANCELLED = 1;
:线程取消获取锁,比如使用关注的中断的加锁方法lockInterruptibly(),此时线程被中断,节点就会进入CANCELLED,但是lock()不关注中断,即使执行中断也不会进入CANCELLED②static final int SIGNAL = -1;
:后续线程需要唤醒③static final int CONDITION = -2;
:线程处于condition等待队列中④static final int PROPAGATE = -3
:表示下一个acquireShared应无条件传播,属于共享模式的状态,ReentrantLock是独占锁因此不会用到,CountDownLatch倒是用到了,这里不做深究⑤0
:不属于以上的任何状态
3.4 常用方法
- lock(): 一直等待锁,无视中断,在获取锁过程中即使线程遭遇中断也不会管,而是继续等下去,获得锁之后线程继续执行而不是中断
- lockInterruptibly():一直等待锁,考虑中断,获取锁过程中如果线程遭遇中断,则中断等待,则抛出InterruptedException,因此要求必须需要捕获InterruptedException。
- tryLock(long timeout, TimeUnit unit): 指定时间等待锁,指定时间获取锁成功返回true,获取失败返回false。等待时间内考虑中断,等待过程中如果遭遇中断会抛出InterruptedException,因此要求必须需要捕获InterruptedException。
- tryLock(): 直接获取锁,成功返回true,获取失败返回false。等价于tryLock(0, TimeUnit unit);
- unlock():unlock()必须由持有锁的线程执行,否则抛出IllegalMonitorStateException。为了避免死锁,unlock()最好放在finally块内。因为ReentrantLock是一个可重入锁,因此一个线程可能进行了多次加锁,但是unlock()执行一次仅解锁一次,也就是说持有锁的线程加了多少次锁,也要用unlock()解锁多少次,否则锁依然不会被释放!
4 从源码看非公平锁NonfairSync的实现
4.1 tryLock()
源码:
ReentrantLock:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
复制代码
解析:参数获取锁,成功返回true,失败返回false。直接调用了4.1-M.1 Sync.nonfairTryAcquire(int acquires)
方法,即非公平尝试获取锁。
4.1-M.1 Sync.nonfairTryAcquire(int acquires)
源码:
Sync:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 当前状态
if (c == 0) { // 无锁状态直接尝试获取锁
if (compareAndSetState(0, acquires)) { //CAS加锁,见4.1-M.1.1 AbstractQueuedSynchronizer.compareAndSetState(int expect, int update)
setExclusiveOwnerThread(current); // 加锁成功则将持有锁的线程维护成当前线程,exclusiveOwnerThread简单的set方法,不再深入
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果当前不是无锁状态则判断是不是一个重入锁,exclusiveOwnerThread简单的get方法,不再深入
int nextc = c + acquires; // 是重入锁的话就维护重入次数。因为重入意味着当前线程持有锁,所以这个地方不会出现加算的并发问题的
if (nextc < 0) // overflow // state是int的这是一个越界检查
throw new Error("Maximum lock count exceeded");
setState(nextc); // 设置重入次数,state简单的set方法,不再深入
return true;
}
return false;
}
复制代码
解析:判断锁的当前状态,如果处于无锁状态就通过CAS尝试获取锁,否则的话就判断是不是锁重入,如果是则维护一下重入次数。返回的就是加锁成功还是失败。
4.1-M.1.1 AbstractQueuedSynchronizer.compareAndSetState(int expect, int update)
源码:
AbstractQueuedSynchronizer:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码
解析:unsafe.compareAndSwapInt就是通过Unsafe实现的CAS方法(CAS是一个原子性的将目标由期望值expect替换为新值update的动作),stateOffset是state
属性的偏移量,因此这个方法的作用就是通过CAS将state
由expect设置成update,并返回CAS结果。
4.2 lock()
源码:
ReentrantLock:
public void lock() {
sync.lock();
}
复制代码
解析:一直等待锁,无视中断,在获取锁过程中即使线程遭遇中断也不会管,而是继续等下去,获得锁之后线程继续执行而不是抛出异常。ReentrantLock 的lock()直接调用sync的lock()。
4.2-M.1 NonfairSync.lock()
源码:
NonfairSync:
final void lock() {
if (compareAndSetState(0, 1)) // 直接通过cas设置state,即尝试获取锁,见4.1-M.1.1 AbstractQueuedSynchronizer.compareAndSetState(int expect, int update)
setExclusiveOwnerThread(Thread.currentThread()); // 加锁成功则将持有锁的线程维护成自己,exclusiveOwnerThread简单的set方法,不再深入
else
acquire(1); // 取锁失败走acquire加锁,见4.2-M.1.1 AbstractQueuedSynchronizer.acquire(int arg)
}
复制代码
解析:先假设无锁,直接同通过CAS设置state来获取锁,如果成功就将持有锁的线程维护成自己,失败的话就走acquire获取锁。
4.2-M.1.1 AbstractQueuedSynchronizer.acquire(int arg)
源码:
AbstractQueuedSynchronizer:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取锁,见4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
// 取锁失败将当前线程加入等待队列
// addWaiter见4.2-M.1.1.2 AbstractQueuedSynchronizer.addWaiter(Node mode);
//acquireQueued见4.2-M.1.1.3 AbstractQueuedSynchronizer.acquireQueued(final Node node, int arg)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //Node.EXCLUSIVE表示这是一个独占模式的节点
// 上一步为true的话走到这里复原中断标识,见4.2-M.1.1.4 AbstractQueuedSynchronizer.selfInterrupt()
selfInterrupt();
}
复制代码
解析:尝试获取锁,如果获取锁成功则结束,获取失败的话把线程加入等待队列并阻塞。如果acquireQueued返回的中断标识是true,说明加锁过程中执行过中断,那么还要重新执行interrupt()复原中断标识,因为4.2-M.1.1.3.3 AbstractQueuedSynchronizer.parkAndCheckInterrupt()
使用的Thread.interrupted()虽然会返回中断标识但是也会同时清除中断标识,虽然清除中断标识对于ReentrantLock的阻塞流程是很有必要的,但是不能让ReentrantLock自己的内部流程需要影响到外部逻辑,因此需要在加锁成功后将中断标识复原。
4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
此方法定义在AbstractQueuedSynchronizer,但是在NonfairSync实现:
源码:
NonfairSync:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 非公平尝试获取锁,见4.1-M.1 Sync.nonfairTryAcquire(int acquires)
}
复制代码
解析:直接调用了4.1-M.1 Sync.nonfairTryAcquire(int acquires)
方法,即非公平尝试获取锁。
4.2-M.1.1.2 AbstractQueuedSynchronizer.addWaiter(Node mode)
源码:
AbstractQueuedSynchronizer:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 当前等待队列的尾部
if (pred != null) { // 如果队列里已经有值了
node.prev = pred; // 设置prev
if (compareAndSetTail(pred, node)) { // 因为可能有多个线程都竞争尾部,因此使用CAS替换尾部
pred.next = node; // 替换尾部成功说明入队成功,设置上一个尾部的next
return node;
}
}
// tail为空即队列无值,或者CAS竞争尾部失败,就到这里了,见4.2-M.1.1.2.1 AbstractQueuedSynchronizer.enq(final Node node)
enq(node);
return node;
}
复制代码
解析:为当前线程建立一个指定模式(Node.EXCLUSIVE表示独占模式,Node.SHARED表示共享模式)的Node节点,并载入等待队列。载入成功返回节点实例。
4.2-M.1.1.2.1 AbstractQueuedSynchronizer.enq(final Node node)
源码:
AbstractQueuedSynchronizer:
private Node enq(final Node node) {
for (;;) { // 一直自旋到操作成功为止
Node t = tail;
if (t == null) { // Must initialize // 尾部是null说明现在队列是空
if (compareAndSetHead(new Node())) // 应对竞争cas替换头部为一个空节点!!
tail = head; // CAS成功也同步设置尾部为这个空节点
} else {// 尾部不是null说明现在队列有元素
node.prev = t;
if (compareAndSetTail(t, node)) { // 应对竞争cas替换尾部
t.next = node; // 成功的话设置原尾的next指向新的尾部
return t;
}
}
}
}
复制代码
解析:就是一个将node节点载入等待队列的方法。因为等待队列的是懒初始化的,所以里面还包含了等待队列的初始化过程,值得注意的是初始节点会是一个空节点!
因为竞争的关系,节点载入会通过CAS完成,且整个过程自旋,直到载入成功。载入成功返回节点示例。
4.2-M.1.1.3 AbstractQueuedSynchronizer.acquireQueued(final Node node, int arg)
源码:
AbstractQueuedSynchronizer:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 中断标识
for (;;) {
// 获得node的上一节点
final Node p = node.predecessor();
// 如果上一节点是head,说明当前线程就是等待队列的首位有效节点(原因见关键属性head)
// tryAcquire见4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
if (p == head && tryAcquire(arg)) {
setHead(node); // 加锁成功的话,那么node就成为新的head,下一个有效节点就是node.next;
p.next = null; // help GC // 断开链接帮助GC,原理是什么呢?
failed = false; // 失败标识是false
return interrupted; // 返回中断标识
}
// 可阻塞状态检查,同时也会进行取消节点清除
// 见4.2-M.1.1.3.2 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node pred, Node node)
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞与中断检查,见4.2-M.1.1.3.3 AbstractQueuedSynchronizer.parkAndCheckInterrupt()
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果failed是true,那么就是这个线程被取消了,但是对于acquireQueued来说,cancelAcquire好像是无用的,因为正常流程不会走到这
if (failed)
cancelAcquire(node); // 执行取消流程,见4.2-M.1.1.3.4 AbstractQueuedSynchronizer.cancelAcquire(Node node)
}
}
复制代码
解析:如果当前线程处于第一个有效节点,那就尝试获取锁,如果获取成功就是结束。如果当前线程不是第一个有效节点或是是但是获取锁失败的话就进行可阻塞状态检查,同时也会进行取消节点清除,可阻塞的话就是阻塞等待唤醒。被唤醒后检查并设置中断标识。然后继续自旋以上流程。
最后对于finally内的取消流程4.2-M.1.1.3.4 AbstractQueuedSynchronizer.cancelAcquire(Node node)
,acquireQueued好像并不需要这一部分,因为acquireQueued不关注中断,发生中断也就不会抛出异常而是会继续等待加锁直到成功,此时会设置failed为false,也就是说acquireQueued在正常流程下是不会进入cancelAcquire的。那么为什么还有这一部分吗?难道是为了try块内线程出现可能存在的异常时及时移除问题节点?这样说也不对,因为addWaiter就没样的操作,且cancelAcquire内的逻辑是很明显的处理中断的逻辑。那是为了和另外两个加锁方法保持一致?如果您知道答案欢迎在评论区讨论。
但是对于4.3-M.1.1 AbstractQueuedSynchronizer.doAcquireInterruptibly(int arg)
和4.3-M.1.1 AbstractQueuedSynchronizer.doAcquireNanos(int arg, long nanosTimeout)
来说,cancelAcquire的取消流程是十分必要的,因为他们关注中断,会主动抛出InterruptedException,cancelAcquire内会将节点从同步队列中移除,并寻找下一个有效节点(因为后续线程也可能被取消)。
4.2-M.1.1.3.1 AbstractQueuedSynchronizer.setHead(Node node)
源码:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
复制代码
解析:在关键属性篇已经提过head属性是一个空节点,可以看到这个方法除了赋值head属性外,还会清空thread和prev属性使其成为一个空节点。
4.2-M.1.1.3.2 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node pred, Node node)
源码:
AbstractQueuedSynchronizer:
// 调用这个方法要保证pred是node的prev
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 注意这里看的是pred状态,不是node的状态
if (ws == Node.SIGNAL) // 前一个接单是SIGNAL状态,表示是需要唤醒的状态,那你这身为他的next,自然也需要阻塞了
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; // 返回可阻塞
if (ws > 0) {
/*
大于0的状态只有CANCELLED,表示前一个线程已被取消,此时就往前走,一直找到
一个线程没取消的节点,然后用node.prev = pred和pred.next = node把中间已
取消的节点全部移除等待队列,对于ReentrantLock,如果ws不大于0,那么其实只
会是0和SIGNAL了,原因在下面。
*/
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
如果前一个线程是其他状态:只可能是0 or PROPAGATE,因为如果是CONDITION那就不会再这
个队列里了,而是移入到CONDITION维护的队列里了。这个是AQS的公用方法,但是对于
ReentrantLock来说也不会是PROPAGATE,因为ReentrantLock是独占锁,没有共享模式不会
用到PROPAGATE,因此到这里的也就只有0了,那就用CAS把0改成SIGNAL,这样下次node自旋时
就可以直接判定为可阻塞了,CAS失败也没关系,因为走到这一步返回的是false,不允许阻
塞,那node还会自旋的,要是node获得锁那pred是什么状态就无所谓了,pred肯定不参与了,
要是node没获得锁,还会自旋到这里再进行CAS的。
*/
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // 返回不可阻塞
}
复制代码
解析:阻塞状态判定,前一个线程是SIGNAL状态那么当前线程才阻塞。同时还包括取消节点清除,如果前一个线程已被取消,那么就往上追溯到没取消的节点,并把取消的节点移除等待队列。如果前一个线程状态是0,那么就把他改成SIGNAL,这样下次node自旋时就可以直接判定为可阻塞,但是本次返回的却是不可阻塞。结合4.2-M.1.1.3 AbstractQueuedSynchronizer.acquireQueued(final Node node, int arg)
,node继续自旋直到到pred成为SIGNAL才会阻塞。为什么不是移除取消节点之后,直接返回true进行阻塞,而是再自旋呢?因为可能在移除之前,node的pred不是head,中间隔着那些取消节点,移除取消节点之后node的pred就是head了,此时身为一个有效节点的node应该重新尝试获取锁,而不是直接阻塞。同样的为什么CAS修改为SIGNAL之后不直接返回true进行阻塞呢,注释给的解释是调用方需要重试以确认确实是无法获取到锁。
4.2-M.1.1.3.3 AbstractQueuedSynchronizer.parkAndCheckInterrupt()
源码:
AbstractQueuedSynchronizer:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 如果没有许可就阻塞当前线程,拥有许可就使用该许可证本次继续执行
// 被唤醒的话从这开始运行,注意不仅仅是unPark会解除阻塞状态,线程中断interrupt()方法也会使得线程唤醒
return Thread.interrupted(); // 返回并重置线程的中断标识,这里使用Thread.interrupted()而不是获取当前线程再使用isInterrupted()是非常重要的,见下面的解析
}
复制代码
解析:阻塞等待,并进行中断检查,返回中断标识。LockSupport.park(Object)并不是直接阻塞,而是会判断目标是否拥有许可,如果拥有许可,那么就使用许可,本次不进行阻塞而是继续执行。设置许可的目的是为了更好的应对并发问题,假设锁释放并执行唤醒操作在当前线程的阻塞判断之后执行LockSupport.park(Object)之前,没有许可的话当前线程执行LockSupport.park(Object)就会直接阻塞,导致所有线程都处于阻塞状态,进而形成死锁。而使用许可LockSupport.unpark(Object)会赋予当前线程一个许可,此时执行LockSupport.park(Object)会使用许可继续执行而不是阻塞,就避免了并发问题。许可是不可累加的,如果已处于有许可状态,再调LockSupport.unpark(Object)是无效的。也就是说无论调多少次LockSupport.unpark(Object),都只有一个许可,也仅仅只能抵消一次LockSupport.park(Object)。
最后返回中断标识使用的是Thread.interrupted()而不是获取当前线程再使用isInterrupted(),这是因为Thread.interrupted()在返回中断标识的同时还会重置中断标识,而isInterrupted()仅仅会返回中断标识。在中断标识是true的时间,阻塞方法LockSupport.park(this)是不生效的,也就是说如果不重置中断标识线程会一直自旋而不阻塞导致性能浪费。返回的中断标识也是很有用的,如果是使用的关注中断的加锁方法如lockInterruptibly()那么ReentrantLock后续逻辑就可以根据返回的中断标识决定是否抛出异常,而对于不关注中断的加锁方法如lock()那么ReentrantLock后续逻辑为了不影响加锁之外的外部逻辑,可以根据返回的的中断标识会对线程的中断标识进行复原。
4.2-M.1.1.3.4 AbstractQueuedSynchronizer.cancelAcquire(Node node)
源码:
AbstractQueuedSynchronizer:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// node为空就直接返回
if (node == null)
return;
// 断开node对thread的连接
node.thread = null;
// Skip cancelled predecessors
// 往上找,一直找到一个没取消的
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; // 没取消的节点的next
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; // 设置的这个节点为CANCELLED状态
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // 如果这个node是尾部,就移除
compareAndSetNext(pred, predNext, null);
} else { // 不是尾部的话
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head && // pred不能是头,如果是头的话node.next就是第一个有效节点了,直接唤醒了
((ws = pred.waitStatus) == Node.SIGNAL || // 如果pred已经是SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 如果不是SIGNAL但是CAS修改成SIGNAL成功了
pred.thread != null) { //pred要是个有效节点
Node next = node.next; // 断开链,也就是移除那些取消的节点
if (next != null && next.waitStatus <= 0) // 如果下一个节点是一个有效节点的话
compareAndSetNext(pred, predNext, next); // 就把链pred到next的链接上
} else {
// 不满足以上情况,比如pred是head,或者pre不是head但是pred不是SIGNAL也修改SIGNAL失败,
// 或者pred不是一个有效节点,这时间直接唤醒node后的第一个有效节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
复制代码
解析:就是处于等待队列的线程取消的一些修正逻辑,比如清除那些在它之前且相连的取消节点,然后根据情况决定是修正等待队列链或者是直接唤醒下一个线程。
4.2-M.1.1.3.4.1 AbstractQueuedSynchronizer.unparkSuccessor(Node node)
源码:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 查看当前节点状态
if (ws < 0) // 非取消状态的话就改成0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 查看下一个节点
if (s == null || s.waitStatus > 0) { // 如果下一个节点是无效的(为空或者已取消)
s = null; // 先设置成null,免得找不到有效节点,导致s还是原值
// 从尾部(tail)往前找,找到node之后的第一个有效的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
/**
如果s.thread处于阻塞状态则进行唤醒,如果s.thread处于非阻塞状态,则赋予s.thread一个许可.
关于许可,见4.2-M.1.1.3.3 AbstractQueuedSynchronizer.parkAndCheckInterrupt()
*/
// LockSupport.unpark内部有判断,只有入参不为null才真正执行相关动作
LockSupport.unpark(s.thread);
}
复制代码
解析:首先入参节点node的状态如果是非取消状态的话会被设置为0。接下来进入唤醒流程:查看node的下一个节点node.next。如果node.next存在且有效的就唤醒node.next。如果node就是tail的话,说明没有线程需要唤醒了,因此设置了s=null
,for循环也不会进入循环内逻辑,此时就不会唤醒任意一个线程。如果node不是tail但是属于取消状态的话就需要找node后的第一个有效节点。但是从代码中我们可看到是从尾部tail开始往前找的。为什么不是从node开始往后找呢?这是因为node是无效节点,那么就存在已被后续其他节点断链移除的可能(见4.2-M.1.1.3.2 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node pred, Node node)
相关逻辑),此时node所在的链已经不在等待链上了,从node往后找就肯定找不到有效节点。而tail是原子性修改,会一直在有效的等待链上。因此我们可以看到for循环的判定条件有两个t != null && t != node
, 其中t != node
保证node未被移除时到node就结束取node后的第一个有效节点。但是如果node已被移除,t != null
来保证从tail遍历到head就结束,以免出现空指针。但是在核心属性篇以及4.2-M.1.1.3.1 AbstractQueuedSynchronizer.setHead(Node node)
我们也能看到head其实并不是一个有效节点,且head肯定会满足t.waitStatus <= 0
的,那么s就是head,head.thread又是null,LockSupport.unpark(s.thread)就唤醒了个寂寞,谁都没唤醒。但是这样的话休眠线程一个没唤醒难道不会死锁吗?并不会的,我们要记得进行无效节点清除逻辑的前提是纳入等待队列且获取锁失败,获取锁失败意味着当前有线程持有锁(可能是闯入线程),这里不唤醒,持有锁的线程释放锁时也会执行唤醒指令,不会造成死锁。
4.2-M.1.1.4 AbstractQueuedSynchronizer.selfInterrupt()
源码:
AbstractQueuedSynchronizer:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
复制代码
解析:执行线程中断,复原中断标识。一定要注意:一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。因此所谓的线程中断仅仅是设置线程的中断标识,至于是否响应这个中断,由线程自己判定中断标识来自己决定!
4.3 lockInterruptibly()
源码:
ReentrantLock:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
复制代码
解析:一直等待锁,考虑中断,获取锁过程中如果线程遭遇中断,则抛出InterruptedException,因此要求必须需要捕获InterruptedException。ReentrantLock 的lockInterruptibly()直接调用sync的acquireInterruptibly()。
4.3-M.1 AbstractQueuedSynchronizer.acquireInterruptibly(int arg)
源码:
AbstractQueuedSynchronizer:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 因为lockInterruptibly()考虑中断,因此先判断一下线程是否中断的,是的话就抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) // 参数获取锁,见4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
// 失败的话调用doAcquireInterruptibly
// 见4.3-M.1.1 AbstractQueuedSynchronizer.doAcquireInterruptibly
doAcquireInterruptibly(arg);
}
复制代码
解析:因为lockInterruptibly()考虑中断,因此先判断一下线程是否中断,是的话就抛出InterruptedException。又因为是非公平锁,所以先用4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
尝试一下获取锁,失败的话就以中断模式获取锁。
4.3-M.1.1 AbstractQueuedSynchronizer.doAcquireInterruptibly(int arg)
源码:
AbstractQueuedSynchronizer:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// addWaiter见4.2-M.1.1.2 AbstractQueuedSynchronizer.addWaiter(Node mode);
final Node node = addWaiter(Node.EXCLUSIVE); // 创建一个节点加入队列,Node.EXCLUSIVE表示这是一个独占模式的节点
boolean failed = true; // 失败标识
try {
for (;;) {
// 获得node的上一节点
final Node p = node.predecessor();
// 如果上一节点是head,说明当前线程就是等待队列的首位有效节点(原因见关键属性head)
// tryAcquire见4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
if (p == head && tryAcquire(arg)) {
setHead(node); // 加锁成功的话,那么node就成为新的head,下一个有效节点就是node.next;
p.next = null; // help GC // 断开链接帮助GC,原理是什么呢?
failed = false; // 失败标识设置为false
return;
}
// 可阻塞状态检查,同时也会进行取消节点清除
// 见4.2-M.1.1.3.2 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node pred, Node node)
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞与中断检查,见4.2-M.1.1.3.3 AbstractQueuedSynchronizer.parkAndCheckInterrupt()
parkAndCheckInterrupt())
throw new InterruptedException(); // 抛出异常
}
} finally {
//如果failed是true,那么就是线程中断了,没获取到锁,而是抛出InterruptedException()不再继续获取锁
if (failed)
// 执行取消流程,见4.2-M.1.1.3.4 AbstractQueuedSynchronizer.cancelAcquire(Node node)
cancelAcquire(node);
}
}
复制代码
解析:整个逻辑和4.2-M.1.1.3 AbstractQueuedSynchronizer.acquireQueued(final Node node, int arg)
是如此相似。最主要的区别是此方法需要关注异常,在4.2-M.1.1.3.3 AbstractQueuedSynchronizer.parkAndCheckInterrupt()
检查到线程在阻塞期间被执行过中断,就直接抛出InterruptedException。而4.2-M.1.1.3 AbstractQueuedSynchronizer.acquireQueued(final Node node, int arg)
则是设置中断标识,并在获取到锁后返回这个标识。
4.4 tryLock(long timeout, TimeUnit unit)
源码:
ReentrantLock:
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
复制代码
解析:指定时间等待锁,指定时间获取锁成功返回true,获取失败返回false。等待时间内考虑中断,等待过程中如果遭遇中断会抛出InterruptedException,因此要求必须需要捕获InterruptedException。直接调用了sync的4.3-M.1 AbstractQueuedSynchronizer.tryAcquireNanos(int arg, long nanosTimeout)
。
4.3-M.1 AbstractQueuedSynchronizer.tryAcquireNanos(int arg, long nanosTimeout)
源码:
AbstractQueuedSynchronizer:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 因为tryLock(long timeout, TimeUnit unit)考虑中断,因此先判断一下线程是否中断的,是的话就抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || // 非公平锁,先尝试获取锁,常规操作,见4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
doAcquireNanos(arg, nanosTimeout); // tryAcquire(int acquires)获取失败的话走定时模式获取锁。
}
复制代码
解析:因为tryLock(long timeout, TimeUnit unit)考虑中断,因此先判断一下线程是否中断,是的话就抛出InterruptedException。又因为是非公平锁,所以先用4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
尝试一下获取锁,失败的话就以定时模式获取锁。
4.3-M.1.1 AbstractQueuedSynchronizer.doAcquireNanos(int arg, long nanosTimeout)
源码:
AbstractQueuedSynchronizer:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) // 如果等待时间小于等于0就直接返回失败
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 算出最终时间
// 创建一个节点加入队列,Node.EXCLUSIVE表示这是一个独占模式的节点
// addWaiter见4.2-M.1.1.2 AbstractQueuedSynchronizer.addWaiter(Node mode);
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获得node的上一节点
final Node p = node.predecessor();
// 如果上一节点是head,说明当前线程就是等待队列的首位有效节点(原因见关键属性head)
// tryAcquire见4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
if (p == head && tryAcquire(arg)) {
setHead(node); // 加锁成功的话,那么node就成为新的head,下一个有效节点就是node.next;
p.next = null; // help GC // 断开链接帮助GC,原理是什么呢?
failed = false; // 失败标识设置为false
return true; // 发布加锁时间
}
nanosTimeout = deadline - System.nanoTime(); // 算出剩余时间
if (nanosTimeout <= 0L) // 如果剩余时间小于等于0
return false; // 返回加锁失败
// 可阻塞状态检查,同时也会进行取消节点清除
// 见4.2-M.1.1.3.2 AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node pred, Node node)
if (shouldParkAfterFailedAcquire(p, node) &&
/*
剩余时间是否大于自旋优化阈值,spinForTimeoutThreshold是1000L,
也就是1000纳秒,这个判断是因为阻塞和唤醒都需要线程状态改变,
很耗时,如果剩余时间过短,就没必要阻塞了,直接自旋就可以了
*/
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout); // 阻塞指定的时间
if (Thread.interrupted()) // 获得并重置线程的中断标识
throw new InterruptedException(); // 抛出异常
}
} finally {
//如果failed是true,那么就是线程中断了,没获取到锁,而是抛出InterruptedException()不再继续获取锁
if (failed)
// 执行取消流程,见4.2-M.1.1.3.4 AbstractQueuedSynchronizer.cancelAcquire(Node node)
cancelAcquire(node);
}
}
复制代码
解析:整个逻辑和4.2-M.1.1.3 AbstractQueuedSynchronizer.acquireQueued(final Node node, int arg)
还是有相似之处。整个过程就是获取锁,如果获取失败就阻塞,被唤醒后继续获取锁直到满足条件跳出自旋。不过此方法的阻塞不再是无限制的等待,而是添加了阻塞时间以达到定时功能,同时也设置了自旋阈值来进行相应的性能优化。因为tryLock(long timeout, TimeUnit unit)关注中断,因此在发现线程中断标识为true时直接抛出InterruptedException。
4.5 unlock()
源码:
ReentrantLock:
public void unlock() {
sync.release(1);
}
复制代码
解析:释放一次锁。调用sync的release(int arg)实现。
4.4-M.1 AbstractQueuedSynchronizer.release(int arg)
源码:
AbstractQueuedSynchronizer:
public final boolean release(int arg) {
// 释放锁
if (tryRelease(arg)) {
Node h = head;
// waitStatus 不等于0表示需要唤醒后续
if (h != null && h.waitStatus != 0)
// 唤醒下一个有效线程,见4.2-M.1.1.3.4.1 AbstractQueuedSynchronizer.unparkSuccessor(Node node)
unparkSuccessor(h);
// 锁完全释放
return true;
}
return false; // 锁未完全释放
}
复制代码
解析:因为ReentrantLock是可重入锁,tryRelease在锁完全释放时才会返回true,然后进入4.2-M.1.1.3.4.1 AbstractQueuedSynchronizer.unparkSuccessor(Node node)
进入唤醒逻辑。
4.4-M.1.1 Sync.tryRelease(int releases)
tryRelease(int releases)定义在AbstractQueuedSynchronizer,实现在Sync。
源码:
protected final boolean tryRelease(int releases) {
// 减去要释放的算出新的state
int c = getState() - releases;
// 持有锁的线程才能解锁,否则就抛出IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否是完全解锁
boolean free = false;
// c等于0时说明完全解锁了,因为有锁重入,所以减到0才算完全解锁
if (c == 0) {
free = true;
// 清空持有线程
setExclusiveOwnerThread(null);
}
// 设置状态
setState(c);
// 返回是否完全解锁
return free;
}
复制代码
解析:校验是否为当前线程持有锁否则抛出IllegalMonitorStateException。根据锁新的状态判断锁是否完全释放。如果锁完全释放则清空ExclusiveOwnerThread。最后更新state并返回锁是否完全释放。
5 从源码看公平锁FairSync的实现
从3.1 类定义章节NonfairSync和FairSync的类定义对比可以看到,NonfairSync和FairSync仅有lock()和tryAcquire(int acquires)两个方法实现不同,其余无论是加锁还是解锁实现上均相同。因此上面对非公平锁的解析大部分都适合于公平锁,只要在查阅时用5.2-M.1 FairSync.lock()
替换4.2-M.1 NonfairSync.lock()
,5.2-M.1.1 FairSync.tryAcquire(int acquires)
替换4.2-M.1.1 AbstractQueuedSynchronizer.acquire(int arg)
就好,因此下面仅仅列出这两个不同的方法的源码。
5.1 tryLock()
源码:
ReentrantLock:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
复制代码
解析:公平锁的tryLock()在实现上和非公平锁完全一样,这意味着即使设置ReentrantLock为公平模式,使用tryLock()依然是非公平模式。如果确实需要公平模型下这种尝试加锁的方式,可以使用功能相似的tryLock(0,TimeUnit.SECONDS),不过他会响应中断。至于tryLock()的实现,和4.1 tryLock() 一样,不再多说。
5.2 lock()
源码:
ReentrantLock:
public void lock() {
sync.lock();
}
复制代码
解析:一直等待锁,无视中断,在获取锁过程中即使线程遭遇中断也不会管,而是继续等下去,获得锁之后线程继续执行而不是中断。ReentrantLock 的lock()直接调用sync的lock()。
5.2-M.1 FairSync.lock()
源码:
FairSync:
final void lock() {
// 直接使用acquire加锁,acquire见4.2-M.1.1 AbstractQueuedSynchronizer.acquire(int arg)
acquire(1);
}
复制代码
解析:直接走4.2-M.1.1 AbstractQueuedSynchronizer.acquire(int arg)
获取锁。acquire内部会调用tryAcquire(int acquires),但是对于公平锁来说调用的不再是4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
,而是FairSync的实现5.2-M.1.1 FairSync.tryAcquire(int acquires)
。除此之外在其它实现上FairSync与非公平锁NonfairSync相同。
5.2-M.1.1 FairSync.tryAcquire(int acquires)
源码:
FairSync:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 当前状态
if (c == 0) { // 无锁状态
if (!hasQueuedPredecessors() &&
//CAS加锁,见4.1-M.1.1 AbstractQueuedSynchronizer.compareAndSetState(int expect, int update)
compareAndSetState(0, acquires))
// CAS成功则继续将持有锁的线程维护成当前线程,exclusiveOwnerThread简单的set方法,不再深入
setExclusiveOwnerThread(current);
return true; // 返回获取锁成功
}
}
// 如果当前不是无锁状态则判断是不是一个重入锁,exclusiveOwnerThread简单的get方法,不再深入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 是重入锁的话就维护重入次数。因为重入意味着当前线程持有锁,所以这个地方不会出现加算的并发问题的
if (nextc < 0) // state是int的这是一个越界检查
throw new Error("Maximum lock count exceeded");
setState(nextc);// 设置重入次数,state简单的set方法,不再深入
return true;
}
return false;
}
复制代码
解析:和4.1-M.1 Sync.nonfairTryAcquire(int acquires)
的实现类似,但是无锁状态获取锁时增加了5.2-M.1.1.1 AbstractQueuedSynchronizer.hasQueuedPredecessors()
的判断,这是公平锁实现的关键,判断当前线程是否为等待线程中最先申请锁的线程,实现公平获取。而且次方法被定义为final
,不允许子类进行修改来保护公平实现。
5.2-M.1.1.1 AbstractQueuedSynchronizer.hasQueuedPredecessors()
源码:
AbstractQueuedSynchronizer:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
/**
* 三个条件判断有没有比当前线程更早的线程获取锁:
* 1:h != t标识队列存在有效节点,进入下一步判断,如果h==t则意味着现在等待队列
* 为空,没有等待中的线程,那当前线程就是最优先的线程
* 2:h.next==null,如果h != t但是h.next==null,说明有线程比当前线程更早,这
* 是因为在4.2-M.1.1.2.1 AbstractQueuedSynchronizer.enq(final Node node)
* 中我们可以看到是先用CAS设置tail,然后才设置t.next,h != t但是h.next==null
* 说明有线程就走到这一步了,而h.next又是第一个有效节点,因此当前线程只会更
* 晚,所以也会返回true标识有更早的线程
* 3:s.thread != Thread.currentThread():很这个就很好理解了,s是h.next,即
* 队列中的第一个有效节点,s.thread就是当前最早的线程,判断一下当前线程是不是s.thread
*/
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
复制代码
解析:如果当前线程前面有排队线程,则为true;如果当前线程位于队列的头部或队列为空,则为false。
5.3 lockInterruptibly()与tryLock(long timeout, TimeUnit unit)
实现流程与非公平锁NonfairSync的4.3 lockInterruptibly() ,4.4 tryLock(long timeout, TimeUnit unit) 一样,区别仅仅在于tryAcquire(int acquires)使用的是5.2-M.1.1 FairSync.tryAcquire(int acquires)
而不是4.2-M.1.1.1 NonfairSync.tryAcquire(int acquires)
,相关逻辑替换即可。
开发成长之旅 [持续更新中...]
关联导航:30:ReentrantLock的简单使用 - 掘金 (juejin.cn)
欢迎关注…