AQS简介
AQS是其他同步控件的基础,例如ReentrantLock,CountDownLatch,CylicBarrier,这几个类都是继承AQS然后重写AQS的模板方法实现同步的
AQS维护了一个state控制加锁解锁的状态变量,一个双向链表表示等待获取锁的线程
成员属性
内部类Node,封装了线程
static final class Node {
//表示当前节点是共享状态
static final Node SHARED = new Node();
//表示当前节点是独占状态
static final Node EXCLUSIVE = null;
// 标识线程已取消,不排队了
static final int CANCELLED = 1;
// 标识后继节点需要需要被挂起,然后当前节点释放锁之后就唤醒后继节点
static final int SIGNAL = -1;
// 标识线程在condition队列
static final int CONDITION = -2;
// 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
static final int PROPAGATE = -3;
/**
当前节点保存的线程对应的等待状态
0 默认状态 >0 取消状态
-1 表示当前node节点需要挂起后继节点,然后释放锁时唤醒后继节点
*/
volatile int waitStatus;
/**
阻塞队列前驱节点
*/
volatile Node prev;
/**
阻塞队列后继节点
*/
volatile Node next;
/**
*当前node节点封装的线程
*/
volatile Thread thread; // 当前node节点封装的线程
/**
condition队列的下一个等待线程
*/
Node nextWaiter;
}
其他属性
/**
* 头结点的线程总是为 null。这是因为,头结点要么是刚初始化的空节点,要么是抢到锁的线程出队了。因 此,我们 也常常把头结点叫做虚拟节点(不存储任何线程)
* head节点是不参与排队的
*/
private transient volatile Node head;
/**
* 队列的尾节点:(头结点head是持有锁的 阻塞队列不包含头结点head,是从head.next 开始,到 tail 结束,这个区间是阻塞队列~)
*/
private transient volatile Node tail;
/**
* 控制加锁解锁的状态变量 独占模式下:0 表示未加锁状态, >0 表示锁已经被头结点head的线程获取了 已加锁状态
*/
private volatile int state;
独占锁加锁流程(非公平)
用ReentrantLock看下加锁的整个流程是怎样的
线程1首先调用ReentrantLock的lock()方法
public void TestLock() {
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
System.out.println("线程1获取锁成功");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
lock.unlock();
}).start();
new Thread(()->{
lock.lock();
System.out.println("线程2获取锁成功");
lock.unlock();
}).start();
}
public void lock() {
sync.lock();
}
然后lock()方法调用ReentrantLock的静态内部类NonfairSync的lock()方法,
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
如果此时锁被其他线程持有了就调用AQS的acquire()方法,锁还没其他线程持有就调用setExclusiveOwnerThread()把当前线程设为持有锁的线程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先调用ReentrantLock的静态内部类NonfairSync的tryAcquire()方法抢锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
然后调用ReentrantLock的静态内部类Sync的nonfairTryAcquire()方法,这个方法是tryAcquire()的返回值,由于只有一个线程所以tryAcquire()会返回false
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //获取执行这个方法的当前线程
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//当前线程已经获取锁了
int nextc = c + acquires; //重入锁
if (nextc < 0) // overflow 溢出
throw new Error("Maximum lock count exceeded");
setState(nextc); //这里不用cas是因为已经获取锁了
return true;
}
return false;
}
如果第一个线程调用lock()之后,还没调用unlock,然后第二个线程调用了lock(),抢锁失败tryAcquire()会返回false,acquire()方法里,tryAcquire()返回false就执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter把节点加入到队列尾部,如果加入失败表示其他线程竞争入队,尾节点被其他线程先一步更新了,那就调用enq方法把节点自旋入队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //封装node节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //旧的尾节点
if (pred != null) {
//链表不为空
node.prev = pred; //新尾节点前驱指向旧的尾节点
if (compareAndSetTail(pred, node)) {
//cas更新尾节点
pred.next = node; //更新成功就把旧的尾节点后继指向新尾节点
return node;
}
}
enq(node); //链表为空或上一步cas更新尾结点失败会来到这里,即有其他线程竞争入队 尾节点被其他线程先一步更新了
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//双向链表是延迟初始化,第一个线程获取锁之后不会初始化链表,要由第二个竞争锁的线程初始化
//链表为空 此时锁被其他线程持有了,但是还没有设置head head节点的作用相当于一个标记即哨兵节点,因为没有记录任何线程 标记有线程占有锁了 不过需要当前线程帮忙设置
if (t == null) {
if (compareAndSetHead(new Node())) //cas更新头结点
tail = head;
} else {
//如果链表不为空,并且之前作为标记的head节点会出队
node.prev = t; //新尾节点前驱指向旧的尾节点
if (compareAndSetTail(t, node)) {
//cas更新尾结点
t.next = node; //旧的尾节点后继指向新尾节点
return t;
}
}
}
}
//这个方法的作用使阻塞队列里的节点去抢锁,返回true表示抢锁前被中断了,不过只是返回是否被中断了,不会去处理中断状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //前驱结点
if (p == head && tryAcquire(arg)) {
//如果前驱节点是头结点并且当前节点抢锁成功了,tryAcquire方法会把当前节点的线程设为持有锁的线程
setHead(node); //就把当前节点设为head setHead()这个会把thread清空,因为head不用参与排队,所以不用设置thread
p.next = null; // help GC
failed = false;
return interrupted;
}
//抢锁失败就来到这里,shouldParkAfterFailedAcquire()返回true的话就会在parkAndCheckInterrupt()把当前线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//这个方法的作用是判断当前节点抢锁失败后是否需要被挂起,这个第一次执行时ws为0,所以会返回false,不过执行完之后ws被设为-1,然后第二次执行这个方法就会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前驱结点的waitStatus
if (ws == Node.SIGNAL) //如果前驱结点的waitStatus是SIGNAL就返回true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//ws大于0表示前驱节点取消了排队,就继续往前找在排队的(小于等于0),因为要靠它来帮忙唤醒
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//把前驱节点的ws更新为SIGNAL,表示当前节点需要先被挂起,然后前驱节点释放锁时再唤醒当前节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//前驱节点的ws为-1时就会执行这个方法把当前节点挂起
private final boolean parkAndCheckInterrupt() {
//如果线程被中断了或前驱节点释放锁了会被唤醒
LockSupport.park(this);
return Thread.interrupted();
}
以上就是独占锁加锁过程,不过此时线程2被挂起来,需要线程1解锁后唤醒它后才能继续去抢锁,下面会分析解锁流程,不过由于是非公平锁,线程2被唤醒后去抢锁的时候,刚好线程3来抢锁就不用排队直接和线程2竞争锁
整体调用流程:
ReentrantLock.lock() -> NonfairSync.lock() -> AQS.acquire() -> NonfairSync.tryAcquire() -> Sync.nonfairTryAcquire() -> AQS.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) -> AQS.shouldParkAfterFailedAcquire() -> AQS.parkAndCheckInterrupt()
独占锁解锁流程
线程1首先调用ReentrantLock的unlock()方法
public void unlock() {
sync.release(1);
}
然后调用AQS的release()方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
然后调用Sync的tryRelease()方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //状态数减1
if (Thread.currentThread() != getExclusiveOwnerThread()) //当前线程要是持有锁的线程
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//表示锁已经完全释放
free = true;
setExclusiveOwnerThread(null); //清空表示没有线程持有锁
}
setState(c); //更新状态
return free;
}
tryRelease()返回true表示锁已经完全释放,然后可以执行if里面的代码块唤醒后驱节点,线程1的后继节点是线程2
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //头结点
//头结点不为空表示链表已经初始化了,有线程在排队,ws不为0表示有后驱节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒后驱节点
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //获取状态
if (ws < 0) //小于0表示后继节点需要被唤醒,设为0表示已经已经唤醒了后继节点
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; //后继节点
//链表只有一个节点或当前节点为尾结点,后继节点为空
//后继节点取消了排队
if (s == null || s.waitStatus > 0) {
s = null;
//从队尾往前找一个需要被唤醒的节点,找排在最前面的
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //需要被唤醒的节点不为空
LockSupport.unpark(s.thread); //唤醒节点
}
之前线程2在acquireQueued()方法被挂起了,然后被线程1唤醒后继续自旋调用tryAcquire()方法抢锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //前驱结点
if (p == head && tryAcquire(arg)) {
//如果前驱节点是头结点并且当前节点抢锁成功了,tryAcquire方法会把当前节点的线程设为持有锁的线程
setHead(node); //就把当前节点设为head setHead()这个会把thread清空,因为head不用参与排队,所以不用设置thread
p.next = null; // help GC
failed = false;
return interrupted;
}
//抢锁失败就来到这里,shouldParkAfterFailedAcquire()返回true的话就会在parkAndCheckInterrupt()把当前线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //之前在这里被挂起了,被线程1唤醒后可以继续自旋调用tryAcquire()方法抢锁,如果抢失败了就继续挂起
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上就是解锁流程
整体调用流程:
ReentrantLock.unlock() -> AQS.release() -> Sync.tryRelease() -> AQS.unparkSuccessor()