AbstractQueuedSynchronizer深入分析
AQS理解起来不难,繁复的概念却让人望而生怯,这里将花几天时间对AQS进行一个详细剖析。
什么是AQS?
AQS(AbstractQueuedSynchronizer
),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果你有看过类似 CountDownLatch
类的源码实现,会发现其内部有一个继承了 AbstractQueuedSynchronizer
的内部类 Sync
。可见 CountDownLatch
是基于AQS框架来实现的一个同步器.类似的同步器在JUC下还有不少。(eg. Semaphore
)
AQS的核心思想是基于volatile int state这样的volatile变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改。同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。
下面先看看AQS数据结构和加入节点,移除节点的流程图
AQS中的数据结构 -- 节点和同步队列
节点加入到同步队列
首节点变化
上面三个图介绍了AQS的数据结构,节点加入,首节点变化(当首节点完成后,后续节点竞争或同步设置为首节点)的流程图介绍。
AQS数据结构介绍
由上图可以看出,AQS的数据结构是一个双向链表结构。在同步队列中,head指向的是同步队列中的首节点(头结点),tail指向的是同步队列中的尾节点。队列中的线程都以链表形式连接。
当竞争失败的线程会被打包成Node放到同步队列中,Node可能的状态有:
CANCELLED:线程等待超时或者被中断了,需要从队列中移走
SIGNAL:后续的节点等待状态,当前节点,通知后面的节点去运行
CONDITION:当前节点处于等待队列
PROPAGATE:共享,表示状态要往后面的节点传播
0:表示初始状态
节点在同步队列中增加和移出。当同步队列中的线程获取到锁之后,执行对应的业务操作,完成业务操作后,释放锁,并在同步队列总移出。当有新的线程要加入到同步队列中,则会在同步队列的节点末尾增加新的线程。
首先,来看一下独占锁同步状态获取与释放:
流程讲解:
1、当线程获取同步状态state时,如果获取成功,则操作完业务后退出并返回。
2、当线程获取同步状态state时,如果获取失败:
①、则会生成一个同步队列的节点,并加入到同步队列的尾部
②、会通过CAS自旋设置:
A、如果判定前驱为头节点,则去获取同步状态state
I、如果获取到了同步状态,则当前节点被设置为头结点,并执行逻辑业务退出返回。
II、如果获取同步状态失败,则继续进入等待状态,等待线程被中断或者前驱节点被释放后,继续通过CAS自旋操作获取同步状态,直到成功。
B、如果判定前驱不为头结点,则进入等待状态,等待线程被中断或者前驱节点被释放后,继续通过CAS自旋操作获取同步状态,直到成功。
这里就不讲共享锁的流程图了。毕竟,共享锁的话,可以想象ReetrantLock提供的读写锁中的读锁,读锁之间的线程共享,不互斥,可以同时进入访问。
我们先来看看AQS(AbstractQueuedSynchronizer)的JDK的结构
了解AQS的使用方式和其中的设计模式。
AQS的使用方式:
1、AQS(AbstractQueuedSynchronizer)是一个抽象类,继承AbstractOwnableSynchronizer抽象类。
2、因此,只有通过继承(extends)方式来使用AQS
AQS的设计模式:
AQS采用的设计模式为:模板方法设计模式。
模板方法设计模式的概念:定义一个操作中算法的骨架,而将一些步骤延迟到子类中,模板方法使得子类可以不改变算法的结构即可重定义该算法的某些特定步骤。通俗点的理解就是:完成一件事情,有固定的数个步骤,但是每个步骤根据对象的不同,而实现细节不同;就可以在父类中定义一个完成该事情的总方法,按照完成事件需要的步骤去调用其每个步骤的实现方法。每个步骤的具体实现,由子类完成。
模板方法设计模式应用举例
发送消息/邮件场景:我们所知道,当公司里面制定了一套邮件或者消息机制,那么这套邮件或者消息机制都是有一个通用模板的,只是里面的内容或者方式可能需要再去个性化的制定。
定义一个邮件/消息的父类(抽象类)SendToCustomer,里面的构成代码如下:
import java.util.Date;
/**
*
* @FileName SendCustom.java
* @Author xiaogaoxiang
* @At 2018年11月14日 下午11:32:17
* @Desc 消息模板方法
*/
public abstract class SendCustom {
// 消息/邮件要发送给哪个用户
public abstract void to();
// 消息/邮件来由哪个用户发送
public abstract void from();
// 消息/邮件内容
public abstract void content();
// 日期,因为是统一的取当前日期,因此不需要子类再去实现
public void date() {
System.out.println(new Date());
}
// 发送
public abstract void send();
// 框架方法-模板方法
public void sendMessage() {
to();
from();
content();
date();
send();
}
}
下面创建一个模板方法的派生类SendSms消息发送类,继承了SendToCustomer抽象类
/**
*
* @FileName SendSms.java
* @Author xiaogaoxiang
* @At 2018年11月14日 下午11:33:07
* @Desc 模板方法的派生类
*/
public class SendSms extends SendCustom {
@Override
public void to() {
// 自定义输出
System.out.println("zhangsan");
}
@Override
public void from() {
// 自定义输出
System.out.println("xiaogaoxiang");
}
@Override
public void content() {
// 自定义输出
System.out.println("Hello world");
}
@Override
public void send() {
// 自定义输出
System.out.println("Send sms");
}
public static void main(String[] args) {
SendCustom sendC = new SendSms();
sendC.sendMessage();
}
}
模板方法设计模式很简单,通过定义统一行为的抽象类,再通过其派生类去做个性化的制定实现某种业务即可。
通过以上了解了AQS采用的是模板方法设计模式的思想之后,我们来了解一下,AQS里面制定了那些模板方法:
独享式获取:accquire、acquireInterruptibly、 tryAcquireNanos
共享式获取:acquireShared、acquireSharedInterruptibly、tryAcquireSharedNanos
独占式释放锁:release
共享式释放锁:releaseShared
需要子类覆盖的流程方法:
独占式获取:tryAcquire
独占式释放:tryRelease
共享式获取:tryAcquireShared
共享式释放:tryReleaseShared
同步器是否处于独占模式:isHeldExclusively
同步状态state:
getState:获取当前的同步状态
setState:设置当前同步状态
compareAndSetState:使用CAS设置状态,保证状态设置的原子性
整体对AQS(AbstractQueuedSynchronizer)抽象类里面定义的内部类,方法进行理解。
1、AQS实现了自身的构造方法。里面是默认的构造方法
2、定义了一个静态的final Node类,该类里面有:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
static final Node SHARED = new Node(); // 定义了共享节点的对象,用来标记哪些线程属于共享节点
static final Node EXCLUSIVE = null; // 定义了独享节点的对象,用来标记哪些线程属于独享节点
static final int CANCELLED = 1; // 表示线程已取消,用数字1进行标志
static final int SIGNAL = -1; // 表示线程成功获取到锁,需要进行唤醒,用数字-1进行标志
static final int CONDITION = -2; // 表示线程需要继续等待,用数字-2进行标志
static final int PROPAGATE = -3; // 表示线程无条件获取到锁,用数字-3进行标志(这个需要再考证一下)
volatile int waitStatus; // 定义了一个volatile类型的整型数waitStatus,表示线程当前所处的状态,有5个状态,分别为:SIGNAL、CANCELLED、CONDITION、PROPAGATE、0;其中0表示没有一种是处于上面4中状态的情况。
volatile Node prev; // 定义了volatile类型的节点对象,表示前驱节点
volatile Node next; // 定义了volatile类型的节点对象,表示下一个节点
volatile Thread thread; // 定义了volatile类型的Thread类对象,对线程进行同步控制
Node nextWaiter; // 定义了Node对象nextWaiter,表示下一个等待节点
// 定义了一个返回boolean值的isShared方法,判断节点是否是共享线程
final boolean isShared() {
return nextWaiter == SHARED;
}
// 定义了返回前驱节点的对象predecessor方法,并且对空指针进行了判断,这里空指针可以忽略不写,但是加上可以减轻对虚拟机VM的压力。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 初始化操作
Node() {}
// 增加新节点
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 增加等待节点
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
AQS中变量和方法的定义和理解:
// 等待队列的头结点,懒加载初始化。除了初始化,只能通过setHead进行修改。备注:如果头结点存在,那么它的状态一定能确保不是CANCELLED状态的。
private transient volatile Node head;
// 等待队列的尾节点,懒加载初始化。只能通过enq(进入队列)来增加一个新的节点进行修改。
private transient volatile Node tail;
// 同步线程的状态
private volatile int state;
// 返回当前同步线程的状态
protected final int getState() {
return state;
}
// 设置同步线程的状态
protected final void setState(int newState) {
state = newState;
}
// 通过原子操作设置如果当前状态,等于预期状态,则将同步状态的值设定为预期目标的值,这里利用了CAS原子操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 利用CAS原子操作(自旋操作)的超时时间
static final long spinForTimeoutThreshold = 1000L;
// 往同步队列插入节点,利用CAS原子操作(自旋)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
今天将AQS(AbstractQueuedSynchronizer)里面相关的构造方法,内部类,变量,方法等进行了注释介绍。2018-11-18 23:22