上篇主要讲解的AQS框架的基本原理以及设计思想,我们知道了AQS自定义同步组件大概需要三种技术:自旋、CAS、LockSupport.park();
那么本篇我们着重分析一下并发大佬 Doug Lea 设计的ReentrantLock,它是如何在保证线程同步的情况下进行加锁的。
这里贴一张ReentrantLock的类图
Sync是ReentrantLock的内部抽象类,继承自AbstractQueuedSynchronizer,实现了简单的获取锁和释放锁。NonfairSync和FairSync分别表示“非公平锁”和“公平锁”,都继承于Sync,并且都是ReentrantLock的内部类。
ReentrantLock加锁源码解析
我们写一个应用,从lock()方法一步一步进行底层源码分析。
0.先写一个Demo
public static void main(String[] args) {
final ReentrantLock reentrantLock = new ReentrantLock(true);
Thread t1 = new Thread(() -> {
reentrantLock.lock();
sleep(2000);
reentrantLock.unlock();
});
t1.start();
}
1.首先调用成员变量sync.lock()方法
public void lock() {
sync.lock();
}
2.在NonfairSync和FairSync的实现类中重写了lock()方法
公平锁lock()源码
final void lock() {
acquire(1);
}
非公平锁lock()源码
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
下面给出代码逻辑执行区别图:
公平锁在加锁之前需要判断是否需要排序;非公平锁会先CAS获取锁,如果没有获取到锁,就会进入到队列去排队。
3.调用模板方法 acquire(1) 方法
这里的参数 arg = 1
public final void acquire(int arg) {
// tryAcquire(arg)尝试加锁
// 如果加锁失败则会调用acquireQueued方法加入队列去排队
// 如果加锁成功则不会调用
// acquireQueued、addWaiter、selfInterrupt 下文解析
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先会调用tryAcquire(arg)尝试获取锁,此处为短路运算,如果获得锁取反那么就不会执行下面的操作。
4.尝试获取锁 tryAcquire(1) 方法
公平锁 tryAcquire(1) 代码:
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取lock对象锁状态,如果锁状态是自由状态为0,上锁为1,大于1表示重入
int c = getState();
// 锁为自由状态,那么就可以上锁
// 在公平锁中需要hasQueuedPredecessors()方法判断
// 非公平锁则会直接进行CAS,可以查看下面非公平锁代码块
if (c == 0) {
// hasQueuedPredecessors()判断当前线程是否需要排队 下文解析
if (!hasQueuedPredecessors() &&
// 如果不需要排队则进行cas尝试加锁
compareAndSetState(0, acquires)) {
// 如果加锁成功则把当前线程设置为拥有锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果c 不等于 0,而且当前线程不是持有锁的线程,直接返回false,尝试加锁失败
// 如果c 不等于 0,但是当前线程为持有锁的线程,则表示为重入,state+1
// 下面代码也证实了ReentrantLock为重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁代码:
ReentrantLock默认为非公平锁,所以在NonfairSync中重写 tryAcquire() 调用Sync中的 nonfairTryAcquire() 方法。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
对比公平锁和非公平锁的 tryAcquire() 方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断 !hasQueuedPredecessors()
,hasQueuedPredecessors() 中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
- 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
- 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)。
4-1.hasQueuedPredecessors() 判断是否需要排队
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires))
从上面代码可以看出这里又是一个短路运算,如果需要排队则不会下面的CAS加锁,所以需要方法返回false。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
下面流程图为判断是否需要排队的各种情况:
值的一提:在s.thread != Thread.currentThread() 的判断,可能会有疑问一个线程park了怎么还能为同一个线程呢?
其实仔细想想,如果线程 t1 在第一次调用tryAcquire()方法尝试加锁失败之后,就会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,而在acquireQueued()还会又一次tryAcquire()机会,这次该线程别未park(),这样也是可以减少锁竞争带来的开销。
到此我们遍分析完了 tryAcquire() 方法,那么我们继续接着 acquire步骤继续分析,为了方便贴一下acquire() 方法的代码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
5.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法分析
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 我们分为两部分分析
5-1. addWaiter(Node.EXCLUSIVE) 方法分析
private Node addWaiter(Node mode) {
// 将当前线程封装为Node对象 mode表示独占还是共享
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 判断队尾是否为空
// 如果不为空,表示队列已经初始化,那么就将node加到队尾
if (pred != null) {
// 前驱为原来的队尾
node.prev = pred;
// 这里需要CAS,防止多个线程加锁失败,入队时就会出现同步问题
// 确保入队操作为一个原子操作
if (compareAndSetTail(pred, node)) {
// pred后继为node,node成为队尾结点
pred.next = node;
// 返回队尾用作acquireQueued()方法参数 下文解析
return node;
}
}
// if未执行来到这步,表示队列没有初始化
enq(node);
return node;
}
队列未初始化来执行 enq(node) 方法
// 采用死循环的方式初始化队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 第一次循环 t = null
if (t == null) {
// 可以发现设置队头重新实例化了一个Node对象
// 这个新实例化的Node对象里面的变量都为null ,就起名null结点
if (compareAndSetHead(new Node()))
// 队头队尾都指向这个null结点
tail = head;
}
// 第二次循环
else {
// 前驱为null结点
node.prev = t;
// 将node结点设为队尾
if (compareAndSetTail(t, node)) {
// null结点的后继为node
t.next = node;
// 退出死循环
return t;
}
}
}
}
AQS队列在初始化为什么要先实例化一个变量为null的结点?个人理解每个队头都是一个变量为null的结点,而这个结点表示当前正在持有锁的线程,在队头t1释放锁之后,又会把队列第二个(因为第一个head结点,永远是正在持有锁的线程)t2线程设为head,并将Node中的thread变为null,这样的做法就是t2已经拿到锁了,那就不需要排队,那么Node的thread也就没有意义。
5-2.acquireQueued() 方法分析
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 取到node的前驱结点
final Node p = node.predecessor();
// 判断p是否为head,如果是head,那么node就是队列中第二个
// 也就是第一个排队结点,设计者会让第一个排队结点自旋2次
// 这样做的目的也是为了减少多线程竞争产生的开销,万一node
// 在自旋过程中,head释放了锁,那么node就会加锁,从而不会被
// park()
if (p == head && tryAcquire(arg)) {
// 加锁成功,那么node 就为队头元素,将node的thread置为null
setHead(node);
// p释放锁之后清除引用
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire()判断前一个结点的waitStatue
// 用来表示当前结点的状态,如果为0置为-1,这也是第一个结点
// 能够自旋2次的关键
// ws=0 - 初始状态
// ws=-1 - 后继结点处于等待状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5-3.shouldParkAfterFailedAcquire(p, node) 方法分析
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// ws = -1 ,表示后继结点的线程已经处于等待状态
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 由于在同步队列中等待的线程等待超时或者被中断
// 需要从同步队列中取消等待
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// CAS将pred的ws改为-1,表示后继结点的线程将要park()
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可能会有疑问:为什么当前结点线程的状态需要保存到前驱结点中?如果该结点保存本线程的状态,那么可能存在以下情况,在调用park()方法之前将ws置为-1,但有可能下一步并未调用park(),这样不就没有意义;跟不可能在调用park()方法之后将ws置为-1,因为当前线程已经处于睡眠状态。所以需要通过前驱结点来保存后继线程的状态。
lock() 流程图
参考:
1、《Java并发编程的艺术》
2、JUC AQS ReentrantLock源码分析(一)
3、ReentrantLock实现原理分析