AQS介绍
AQS全称是AbstractQueuedSynchronizer,它是一个抽象类,内部实现了一个FIFO双向链表,链表的每一个节点都有一个指向前节点和一个指向后节点的指针,所以AQS可以从任意一个节点很快访问前驱和后继,每个节点绑定一个线程,当线程竞争锁失败时,会添加到队列的尾部等待被释放,当锁被释放时,队列头部节点的线程会被释放去竞争锁
ReentrantLock是Lock接口的实现类,它是常用的对象同步锁,并且它是可重入锁,重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数,具体的可以参考这篇文章
ReentrantLock封装了一个内部类Sync,并继承了AbstractQueuedSynchronizer抽象类,ReentrantLock加锁原理则是基于Sync实现,来看看源码
// 加锁
public void lock() {
sync.lock();
}
// 解锁
public void unlock() {
sync.release(1);
}
本篇文章就从ReentrantLock源码分析AQS实现原理
ReentrantLock源码
ReentrantLock另外还封装了两个Sync的子类的静态内部类,分别是NonfairSync和FairSync,NonfairSync字面意思上可以了解它是非公平锁,FairSync则是公平锁,本文主要以NonfairSync做分析
- NonfairSync.lock
final void lock() {
//通过cas操作来修改state状态,表示争抢锁的操作
if (compareAndSetState(0, 1))
// 设置当前获取到锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 未获取到锁的线程再次尝试获取锁
}
这段代码简单解释一下
- 先去通过CAS去抢占锁,如果抢占锁成功
- 保存获得锁成功的当前线程
- 抢占锁失败,调用acquire来走锁竞争逻辑
再看看compareAndSetState
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
compareAndSetState是使用CAS的方式将state设置为1,CAS方式就是先比较需要修改的内容是否和预期值一样(预期值是修改前的值,在修改前可能会被别的线程修改所以先判断是否和预期值一样),如果相同修改修成返回true,不一样则修改失败返回false,这一系列操作是一个原子性的操作。CAS原理具体可以参考这里
AbstractQueuedSynchronizer封装了一些基于CAS方式设置变量值的方法,来看看源码:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex); }
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
Unsafe的CAS方法
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
这一部分的核心是调用Unsafe类的方法,关于Unsafe类我上一篇CAS原理分析的文章介绍过,它提供了基于cpu指令集操作内存的(native)本地方法,每个方法有四个入参,第二个入参是AbstractQueuedSynchronizer的几个成员变量的在内存中地址,这些方法大致的实现流程是先通过变量地址获取到变量的内存值,再与expect值比较,相同则内存值更新为update值返回true,不相同则不做任何操作并返回false。
再回到lock方法,获取锁的过程可以理解为:
- 当state=0时,表示无锁状态
- 当state>0时,表示已经有线程获得了锁,任意一个线程通过CAS将它修改为1成功则获取到锁,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法的主要逻辑是
- 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
- 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部
- acquireQueued,将Node作为参数,通过自旋去尝试获取锁。
Node
在AQS队列内部封装了一个Node内部类用来保存线程,Node是一个FIFO的双向链表的数据结构,这种结构的特点是每个数据结构都有两个指针,分别指向该节点的后继节点和前驱节点。每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到ASQ队列中去。
Node源码:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // 等待状态标识
volatile Node prev; // 前节点
volatile Node next; // 后节点
volatile Thread thread; // 竞争锁的线程
// 存储在condition队列中的后继节点
Node nextWaiter;
// 是否为共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
// Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) {
// Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
// Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
thread,prev,next分别是当前线程,前节点,后节点,因此一个Node队列可以从任意节点开始从前后一直遍历到队列头尾
NonfairSync.tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state值,即获取锁状态
int c = getState();
// state等于0表示无锁状态直接获取锁并返回
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是已拿到锁的线程再次获取锁则state加上1,代表该锁被重入一次
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; 锁状态 + 1 表示重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 未获取到锁返回true
return false;
}
tryAcquire方法是尝试获取独占锁,在第一步CAS获取锁失败后添加到AQS队列前再次获取锁,因为是非公平锁,在添加到AQS队列前可能拿到锁的线程此时释放了锁,此时后面的竞争锁的线程可以碰巧在添加AQS队列前抢占到锁,从而实现有时排在后面竞争锁的线程更快抢占到锁。
公平锁锁下则严格按照每一个CAS获取锁失败的线程会被添加到AQS队列中,队列按照先进先出的原则,使得竞争锁的线程严格按照先后顺序抢占到锁
AbstractQueuedSynchronizer.addWaiter方法
private Node addWaiter(Node mode) {
// 将线程封装到Node中,mode为EXCLUSIVE即为独占锁
Node node = new Node(Thread.currentThread(), mode);
// tail是AQS的一个属性,代表队列尾节点
Node pred = tail;
if (pred != null) {
// tail不为空的情况,说明队列已经被初始化即有节点数据
node.prev = pred; // 当前线程Node的前节点指向AQS队列尾节点
if (compareAndSetTail(pred, node)) {
// 通过CAS方式将Node添加到AQS队列尾部
pred.next = node; // CAS成功则原来的AQS尾节点的后节点指向当前线程Node
return node;
}
}
enq(node); // CAS失败或AQS队列没有节点数据则进入eq方法
return node;
}
addWaiter方法整个过程就是将线程封装成Node,并通过CAS方式添加到AQS队列尾部
AbstractQueuedSynchronizer.enq方法
private Node enq(final Node node) {
// 进入无限for循环,即自旋
for (;;) {
Node t = tail; // 获取AQS队列尾节点
if (t == null) {
// tail为空表示AQS队列没有数据需要进行初始化
// 通过CAS方式初始化AQS队列即创建一个空Node作为队列头部
if (compareAndSetHead(new Node()))
tail = head; // CAS成功此时AQS队列只有一个节点,因此队列头尾都是该节点
} else {
// 如果此时AQS队列已被初始化则将Node添加到队列尾部
node.prev = t; // Node节点指向AQS队列尾节点
if (compareAndSetTail(t, node)) {
// 通过CAS方式将Node添加到AQS队列尾部
t.next = node; // CAS成功则原来的AQS尾节点的后节点指向当前线程Node
return t;
}
}
}
}
eq的方法整个过程就是对AQS队列进行初始化,先确定好队列没有节点数据后将当前线程Node通过CAS方式添加到队列头部head,此时队列只有一个节点,因此把Node的前节点指向本身,并通过CAS方式将队列尾部tail设为Node,并把尾部的后节点指向Node。
此时如果CAS失败即有其他线程已经完成了初始化,则通过CAS方式将Node添加到队列尾部,再把队列尾节点的后节点指向该Node,此时如果CAS再次失败,再此进行这一系列操作即通过自旋直到添加队列尾部成功则结束自旋。
AbstractQueuedSynchronizer.acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 失败标识
try {
boolean interrupted = false; // 线程终端标识
for (;;) {
final Node p = node.predecessor(); // 获取当前线程Node的前节点
// 如果Node的前节点为AQS头部head即Node处于队列最前端,每次只有队列最前端的Node才能后去抢占锁,直到抢占成功
if (p == head && tryAcquire(arg)) {
setHead(node); // 线程抢占锁成功则将将该线程Node从队列中移除
p.next = null; // 线程Node被设为head,原来的head后节点设为null使其能被GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
整个acquireQueued方法的过程大致如下:
- 获取当前线程Node的prev,如果prev为head节点,那么它就有会去争抢锁,调用tryAcquire方法抢占锁
- 如果该Node抢占锁成功以后,把该Node设置为head,并且移除原来的初始化head节点
- 如果获得锁失败,则根据waitStatus决定是否需要挂起线程,线程挂起后,通过cancelAcquire取消获得锁的操作