AQS=Abstract=Queue=Synchronizer
源码阅读,什么是AQS
下面这张图要会画!
NonfairSync -> Sync -> AQS
- 设计模式:模版方法-父类定义流程模版,流出钩子方法,交给子类实现
ReentrantLock 内部有个成员变量 Sync,它是 NofiarSync。Sync 本身继承自 AQS。
// ReentrantLock.class
private final Sync sync;
AQS (父类)内部有个独占线程,表示当前获得这把锁的线程。
// AQS的父类 AbstractOwnableSynchronizer.class
private transient Thread exclusiveOwnerThread;
到了nonfairAcquire()这里就必须要懂什么是AQS,否则是看不懂源码的。
这里衔接实现什么公平fairA什么是非公平nonfair。
- 公平:现在有一把锁,有一个等待队列,当一个新线程来了以后,先去等待队列里看,如果没有其他线程,就获得这把锁,这就叫公平。上来先排队,公平。
- 非公平:上来二话不说,我才不管队列是否有线程在等着,这叫非公平。上来就抢,抢着就算我的,非公平。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
获得锁
tryAcquire()
AQS 的 acquire() --》 Sync 的 tryAcquire() --》 NofairSync 的 nofairTryAcquire() ,如下:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// 首先获得当前线程
final Thread current = Thread.currentThread();
// 其次获得锁的状态
int c = getState();
// 可独占性判断
if (c == 0) {
// 如果等于0,通过CAS的操作(期望值是0)设定state
if (compareAndSetState(0, acquires)) {
// 把当前线程设置为AQS独占线程。
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入判断
else if (current == getExclusiveOwnerThread()) {
// 如果当前线程已经是独占线程。nextc+1,表示可重入。
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 否则枪锁失败
return false;
}
state
这段代码里有个 getState()方法,很关键,它实际上是获取AbstractQueueSynchronizer的一个属性 state 。这个state表示什么呢,有你定,有子类来定。
- 在ReentrantLock中state表示重入的次数,每重入一次state+1,当state=0表示释放这把锁。
- 在CountDownLatch中,state表示要需要count down多少次,这个门闩才能解开。比如state=5,表示要count down 5次。
/**
* The synchronization state.
*/
private volatile int state;
队列
在AQS内部,state的基础之上,在它的下面跟着一个队列。这个队列里面每一个都是一个node节点。
Node
Node里面有几项比较重要。最重要的一项,是它里面保留了一个Thread。所以这个队列是个什么队列呢,线程队列。而且还有 pred 和 next 指针。所以它还是一个双向链表。除此之外呢,它还有一个头 head,一个尾 tail。
为什么是双向链表?
因为加进来的这个节点需要知道前一个节点的状态。如果前面一个节点正在持有线程的过程中,那后面一个线程就在那儿等着。如果前面一个节点已经取消掉了,就应该越过这个节点不去考虑它的状态。
AQS最核心的点
AQS最核心的点在怎么入队,怎么出队。原来入队出队是要加锁的,这里都是CAS,用CAS的方式往尾巴上加。
百度面试题:为什么AQS的底层是cas+volatile
往队列里面加thread的时候,是通过CAS的方法 compareAndSetTail() 往尾巴tail上加。而tail是volatile的。还有比如修改state的值,也是通过CAS的方法 compareAndSetState() 来修改,而state也是volatile的。
addWaiter()
如果tryAcquire()失败,就调用acquireQueued() 加入等待队列,怎么加入呢?通过addWaiter(), 加入一个等待者,以什么方式呢?Node.EXCLUSIVE 独占的方式。也就是把当前线程以独占的形式扔到等待队列里头。
- addWaiter() 方法也是经过了几个版本的迭代。
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 死循环,不干成这件事誓不罢休
for (;;) {
// 记录当前tail作为期望值oldTail
Node oldTail= tail;
if (oldTail!= null) {
node.setPrevRelaxed(oldTail);
// cas操作setTail,如果成功就返回,如果失败就继续循环
// cas操作的底层是Unsafe这个类的单例在做具体操作
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
}
}
}
加到尾巴上这个操作,为什么要用CAS呢?这是AQS的核心点,理解了这一点就理解了AQS。思考这样一个问题:网一个链表的尾巴上加一个节点,好多个线程一起在加,这个时候要用到一把锁,这个锁怎么加?一般的方法是锁定整个链表,但是这么做的话,锁的粒度太大了。AQS是通过对Tail进行CAS操作,只观测这一个节点就可以了。
acquireQueued()
下面的代码解释为什么是双向链表,因为后面的节点需要不断查看前一个节点的状态。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for(;;) {
// 先拿到node的前置节点,predecessor就是prev
final Node p = node.predecessor();
// 如果前置节点是head,说明我是第二个,快要轮到我了,那么就在死循环里,通过tryAcquire和head节点竞争一下锁,如果竞争到了,说明头节点释放了锁,那么就释放head,并把我设置为新的head。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;// help GC 帮助垃圾回收
failed = false;
return interrupted;
}
// 否则说明我前面还有很多线程,那么就park阻塞,暂停死循环,老老实实等着,等待前置节点叫醒我。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
VarHandler
补充一个知识点:
在addWaiter里面,将 oldTail 设置为当前节点的前置节点时候,用到了PREV,他是一个 VarHandler。
private static final VarHanlder NEXT;
private static final VarHanlder PREV;
private static final VarHanlder THREAD;
private static final VarHanlder WAITSTATUS;
该怎么来解释这个VarHandler呢?可以这么来解释。结论是:指向某个变量的引用。那么这个引用有什么作用呢?看以下小程序:
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
public class T01_HelloVarHandle {
int x = 8;
private static VarHandle handle;
static {
try {
// 从 T01_HelloVarHandle 这个class中名字为x的类型为int的那么一个变量,请你指向它。
// 所以通过x能找到这个x,通过handle也能找到这个x,里面装着一个8.
handle = MethodHandles.lookup().findVarHandle(T01_HelloVarHandle.class, "x", int.class);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
T01_HelloVarHandle t = new T01_HelloVarHandle();
//plain read / write
System.out.println((int)handle.get(t));
handle.set(t,9);
System.out.println(t.x);
// VarHandle的意义在这里,可以通过它进行compareAndSet,进行原子性操作。进行原子性的修改值。
handle.compareAndSet(t, 9, 10);
System.out.println(t.x);
handle.getAndAdd(t, 10);
System.out.println(t.x);
}
}
- VarHandler指向某个变量的引用。
- VarHandler可以对变量进行CAS,保证原子性的修改值。有了CAS,好处当然是不需要进行加锁。
- compareAndSet()。比如 long x = 100;对long型变量赋值,这都不是原子性的操作(int是)。
- getAndAdd()。比如原来x = x +10;这是原子性的吗?不是,如果是这样赋值,是要加锁的。
所以为什么要有 VarHandler:除了可以进行普通属性的操作之外,还可以完成原子性的线程安全操作。
VarHandler vs 反射
在JDK9之前,要操作成员变量,只能用反射,但显然 VarHandler的效率要比反射高得多,反射每次要操纵之前都要做检查,VarHandler不需要,直接操纵。VarHandler可以理解为直接操纵二进制码。
特点总结
- 对普通属性也能进行原子性操作
- 速度比反射快:直接操作二进制码
作业:
- 怎么释放锁的,怎么通知后面的节点的
释放锁
// AbstractQueueSynchronizer
public final boolean release(int arg) {
// 如何tryRelease()由子类决定
if (tryRelease(arg)) {
Node h = head;
// 如果head存在,且没有阻塞,就unparkSuccessor成功释放
if (h != null && h.waitStatus != 0)
unparkSuccesssor(h);
return true;
}
return false;
}
tryRelease()
与 tryAcquire() 相对应的,有一个 tryRelease(),也是需要子类来提供实现,提供一个 ReentrantLock 的 tryRelease() 版本:
protected final boolean tryRelease(int releases) {
// 对state做减法操作,什么时候减到等于0,释放锁
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 独占设为空,释放锁
setExclusiveOwnerThread(null);
}
// 更新state
setState(c);
return free;
}
unparkSuccessor
来看看 unparkSuccessor() 操作:
这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//这里,node一般为当前线程所在的结点。
if (ws < 0)
//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
// 如果为空或已取消
if (s == null || s.waitStatus > 0) {//如果为空或已取消(1表示已取消)
s = null;
// 从后向前找。
for (Node t = tail; t != null && t != node; t = t.prev)
//从这里可以看出,<=0的结点,都是还有效的结点
if (t.waitStatus <= 0)
s = t;
}
// 如果s不为空,唤醒s
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
总结
实际上AQS的内部实际上是park()和unpark(),也不是在全部的情况下都是CAS,它还是有一个锁升级的概念,只不过做的比较隐蔽,你在等待队列的时候,如果拿不到,还是会进入阻塞状态,只不过它前面呢会有一个CAS的状态,它不像原来呢,就直接就进入阻塞态了。所以它内有一个阻塞的概念,它用的是什么呢,是LockSupport.park() 和 LockSupport.unpark()。这个后续再深入。