Java并发 -AQS
AQS 是啥?
AQS的全称为(AbstractQueuedSynchronizer) 即队列同步器。它是构建锁及其他同步组件的基础框架(如 ReetrantLock, ReetrantReadWriteLock,Semaphore等)
AQS 提供了哪些方法?
getState():返回同步状态的当前值;
setState(int newState):设置当前同步状态;
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
tryRelease(int arg):独占式释放同步状态;
tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
tryReleaseShared(int arg):共享式释放同步状态;
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
releaseShared(int arg):共享式释放同步状态;
AQS 内部结构是啥? 怎么控制同步的?
ASQ 内部包含一个 FIFO同步队列来完成 资源获取线程 的排队工作。 如果当前线程获取同步锁失败时,AQS则将会将当前线程以及等待状态等信息构成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程。
结构如下图所示:
AQS采用一个int类型变量state 表示同步状态,当state>0时表示已经获得锁,当state=0时表示释放了锁。
getState() setState(int newState) compareAndSetState(int expect,int update), 三个方法对state进行操作。
CLH 同步队列
CLH是一个FIFO双向队列,当线程获取同步状态失败时,AQS则会将当前线程与状态信息构成一个节点。并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁前提下),使其再次尝试获取同步状态:
先看源码 Node
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 独占 */
static final Node EXCLUSIVE = null;
/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
*/
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;
/** 等待状态 */
volatile int waitStatus;
/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 获取同步状态的线程 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
CLH 入队:
入队列是载简单不过了,无非是tail指向新节点,新节点pre指向尾节点,尾节点的next指向新节点。
//原子性的更新tail
private Node addWaiter(Node mode) {
//新建Node
Node node = new Node(Thread.currentThread(), mode);
//快速尝试添加尾节点
Node pred = tail;
if (pred != null) {
// 新节点的prev指向尾节点
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
// 尾节点的next指向新节点
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}
enq是设法将新节点添加到尾节点
private Node enq(final Node node) {
//多次尝试,直到成功为止
for (;;) {
Node t = tail;
//tail不存在,设置为首节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
都通过了CAS方法compareAndSetTail来设置尾节点的,该方法确保节点线程是安全添加的。
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
这个入队的过程 增加一个自旋锁的概念
自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。
AQS 不关注怎么获得锁和释放锁,这部分由原子类完成
//获取锁的逻辑,成功返回true
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//释放锁,完全释放(考虑重入)返回true
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享模式获取锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享模式释放锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
上面的addWaiter 是在acquire()方法时调用的
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//但是在自旋的过程中则需要判断当前线程是否需要阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
线程会先去获得锁,获得不成功,就加入到同步器的队列中。 一般在 锁调用lock时使用acquire(),例如ReentrantLock
例如ThreadPoolExecutor里面
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
addWaiter返回的是插入的节点
入队阻塞?
入队的时候新加节点是否需要阻塞 通过该acquireQueued方法进行判断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋过程,其实就是一个死循环而已
for (;;) {
// node的前置节点
final Node p = node.predecessor();
// 前置节点是头节点并且获取锁成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire判断当前线程是否应该被阻塞
//获取失败,当前线程等待--具体后面介绍
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
判断是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点
int ws = pred.waitStatus;
//状态为signal,表示当前线程处于等待状态,直接放回true
if (ws == Node.SIGNAL)
return true;
//前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//前驱节点状态为Condition、propagate
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
查看下Node节点类型 >0 的是需要取消的 SIGNAL =-1的是说明阻塞的
Node源码
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
翻译:
CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
0状态:值为0,代表初始化状态。
队列阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
CLH 出队
出队列的主要工作则是唤醒其后继节点(一般来说就是head节点),让所有线程有序地进行下去:
1.入队:原子性的更新tail
2.出队:更新head
public final boolean release(int arg) {
if (tryRelease(arg)) {
//拿到头节点
Node h = head;
if (h != null && h.waitStatus != 0)、
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
线程释放锁的时候一般调用unlock()方法,触发release之后,线程出队,释放锁。
但是AQS并没有采用该模式,而是通过LockSupport.park() 和 LockSupport.unpark() 的本地方法来实现线程的阻塞和唤醒
出队唤醒?
//唤醒后继节点
private void unparkSuccessor(Node node) {
//当前节点状态
int ws = node.waitStatus;
//当前状态 < 0 则设置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//当前节点的后继节点
Node s = node.next;
//后继节点为null或者其状态 > 0 (超时或者被中断了)
if (s == null || s.waitStatus > 0) {
s = null;
//从tail节点来找可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
LockSupport包含的本地方法如下:
入队出队过程中包含了线程的阻塞和唤醒过程,这个也是AQS 同步队列的主要部分
AQS还包含两种 资源共享模式,独占锁 和共享锁,
详见 AQS 提供了哪些方法 ? 这一章节