1 同步队列(CLH)
1.1 简介
CLH队列是Craig, Landin, and Hagersten三人发明的一种基于双向链表数据结构的队列。是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
JAVA中的CLH队列是原CLH队列的一个变种,线程由原来的自旋机制改为阻塞机制。
1.2 Node结构
static final class Node {
/**################节点模式##################*/
/** 共享*/
static final Node SHARED = new Node();
/** 独占 */
static final Node EXCLUSIVE = null;
/**
* CANCELLED:等待线程超时或者被中断、需要从同步队列中取消等待(也就是放弃资源的竞争)
* SIGNAL:后继节点会处于等待状态,当前节点线程如果释放同步状态或者被取消则会通知后继节点线程,使后继节点线程的得以运行
* CONDITION:节点在等待队列中,线程在等待在Condition 上,其他线程对Condition调用singnal()方法后,该节点加入到同步队列中。
* PROPAGATE:表示下一次共享式获取同步状态的时会被无条件的传播下去。
*/
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**等待状态*/
volatile int waitStatus;
/**前指针*/
volatile Node prev;
/**后指针*/
volatile Node next;
/**线程*/
volatile Thread thread;
/**
* AQS中条件队列是使用单向列表保存的,用nextWaiter来连接。
*/
Node nextWaiter;
/** 判断nextWaiter是否为共享*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**获取当前节点的前驱节点 */
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/**
* ######################################
* ##############构造方法################
* ######################################
*/
Node() {
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2 state 同步状态
state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。
/**
* 得到同步状态
*/
protected final int getState() {
return state;
}
/**
* 设置同步状态
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 利用unsafe的cas设置同步状态(原子操作)
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
3 资源
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
(1) tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
(2) tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
(3) tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
(4) tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
4 addWaiter
该方法用于将当前线程根据不同的模式加入到等待队列的队尾,并返回当前线程所在的结点。
4.1 addWaiter
private Node addWaiter(Node mode) {
//1 创建一个节点
Node node = new Node(Thread.currentThread(), mode);
//2 获取队尾,队尾不为空
Node pred = tail;
if (pred != null) {
//2.1 设置新节点的前驱是队尾
node.prev = pred;
//2.2 cas快速设置入队尾(expect=pred,update=tail)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//3 自旋设置队尾
enq(node);
return node;
}
4.2 enq
private Node enq(final Node node) {
//自旋
for (;;) {
//获取队尾
Node t = tail;
//如果队尾为空
if (t == null) {
//创建队头、队尾
if (compareAndSetHead(new Node()))
tail = head;
} else {
//队尾的前驱节点是队尾
node.prev = t;
//cas 设置队尾(expect=tail,update=node)
if (compareAndSetTail(t, node)) {
//把原来队尾的后驱节点设置为更新后的队尾节点
t.next = node;
return t;
}
}
}
}
5 acquire
/**
* 以独占、忽略打断的方式获取资源
*/
public final void acquire(int arg) {
// 尝试获取资源,入队尾,获取资源或进入等待
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
5.1 acquireQueued
/*
* 获取资源或进入等待
*/
final boolean acquireQueued(final Node node, int arg) {
//创建失败状态
boolean failed = true;
try {
//打断状态
boolean interrupted = false;
//自旋
for (;;) {
//获取当前节点的前驱节点
final Node p = node.predecessor();
//如果当前节点是队列第二个节点并且拿到了资源
if (p == head && tryAcquire(arg)) {
//设置队头为当前节点
setHead(node);
//把队头的后指针设置为null,help GC
p.next = null;
failed = false;
return interrupted;
}
//判断是否进入等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果没有抢到资源,取消当前节点获取资源
if (failed)
cancelAcquire(node);
}
}
5.2 shouldParkAfterFailedAcquire
/**
* 是否能够进入等待状态
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//pred 节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* SIGNAL状态,当前节点直接进入等待状态,返回结果为true
*/
return true;
if (ws > 0) {
/*
* 只有CANCELLED状态大于0
* pred为CANCELLED状态,及无效状态,为当前node找到有效的前驱pred
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//更新前驱节点的后指针
pred.next = node;
} else {
/*
* cas设置前驱节点为SIGNAL状态
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
/**
* 如果前驱节点不为SIGNAL,返回false,再次进入acquireQueued的自旋
*/
return false;
}
5.3 parkAndCheckInterrupt
/**
* 使用LockSupport进入等待,LockSupport调用unsafe方法,unsafe为java调用C
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
6 release
6.1 release
/**
* 独占式释放资源
*/
public final boolean release(int arg) {
//释放资源
if (tryRelease(arg)) {
//获取队头
Node h = head;
//队头不为空,队头不是初始化,唤醒等待节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
6.2 unparkSuccessor
/*
* 唤醒等待节点
*/
private void unparkSuccessor(Node node) {
//获取当前节点状态
int ws = node.waitStatus;
//如果状态小于0,初始化状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的后继节点
Node s = node.next;
//后继节点为空或后继节点的状态为CANCELLED状态
if (s == null || s.waitStatus > 0) {
s = null;
//从队尾向前找有效的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒有效的节点
if (s != null)
LockSupport.unpark(s.thread);
}