源码详解
本篇了解一下AQS的源码实现。依照acquire-release、acquireShared-releaseShared的次序来。
1、acquire(int)
独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取资源为止,且整个过程忽略中断的影响。
下面是acquire()的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
执行流程如下:
1、tryAcquire尝试直接去获取共享资源,如果成功则直接返回
2、addWaiter将该线程加入等待的队列尾部,并标记为独占模式
3、acQuireQueued使线程在等待队列中获取资源,如果获取到资源就返回。如果在整个过程中被中断过,则返回true,否则返回false
4、如果在等待队列中被中断是不响应的,只是在获取资源后进行自我中断。
1.1 tryAcquire(int)
此方法尝试去获取资源,如果获取成功则返回true,否则返回false.
下面是AQS中tryAcquire的源码:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS给出了方法定义,没有给出具体的实现。
具体的实现由自定义的同步器去实现。
通过对共享资源的get/set/cas来实现。
1.2 addWaiter(Node)
顾名思义,这个方法用来把当前线程加入到等待线程的结尾,并返回该节点。
源码:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//以给定模式创建Node节点。mode由两种 独占EXCLUSIVE 共享 SHARED
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速放入队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 上一次失败,通过enq入队
enq(node);
return node;
}
这里我们说下Node。Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
1.3 enq(Node)
此方法用于将node加入队尾。
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
//cas自旋 直到成功加入队尾
for (;;) {
Node t = tail;
//如果队列为空,创建一个空的标志节点作为head节点,并将tail指向它
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.4 acquireQueued(Node,int)
通过tryAcquire和addWaiter,线程获取共享资源失败,进入等待队列的尾部。进入等待休息状态,等待其他线程放弃资源后唤醒自己。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//拿到前驱节点,如果前驱已经是头节点,那该节点就是老二节点,由资格去获取资源
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获取成功就把自己设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果自己可以休息了,就park进入等待状态,直到unpark或者打断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt做了什么?
1.4.1 shouldParkAfterFailedAcquire(Node, Node)
此方法用于检查状态,看看自己是否真的可以去休息了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
* 如果已经告诉前驱释放资源后通知他就可以放心的去休息了
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
如果前驱已经放弃就一直往前找,直到找到一个没放弃的排到他后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 把前驱状态设置为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
1.4.2 parkAndCheckInterrupt()
如果线程已经找到安全休息点了,就可以安心休息了,进入等待状态。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park进入休息状态
return Thread.interrupted();//返回线程是否被中断过,执行这个方法后,中断状态清除
}
1.4.3 1.4小结
看了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,再回到acquireQueued。总结下这个方法的执行流程:
1、节点进入队尾后,检查状态,找到安全休息点
2、找到安全休息点后,调用park方法进入等待状态,直到unpark和中断
3、被唤醒后,判断自己是否有资格获取资源,如果拿到,head指向自己的节点,并返回是否被中断过,如果被中断过;如果没有拿到,那就从1开始。
1.5 1小结
再回到acquire,总结下它的流程。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1、调用自定义同步器中的tryAcquire去尝试获取资源,如果获取到直接返回
2、如果获取不到就加入等待队列,标记为独占模式,加入队尾
3、acquireQueued使线程在等待队列中休息,unpark时会去尝试获取资源,获取到资源会返回,拿不到就重新进入等待队列。如果在整个等待过程被中断过,会返回是否中断,如果有,会自我中断。
下面是流程图:
2、release(int)
上一节详细剖析了acquire的细节。这一节来看下他的反操作。
此方法是独占模式下释放共享资源的顶层入口,他会释放定量的资源,当state为0了,会唤醒等待队列中的其他线程来获取资源。
源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
//取到头结点
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒等待队列中的下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
这里要注意tryRelease这个方法,自定义同步器在实现这个方法时在完全释放资源时返回true,否则false
2.1、tryRelease(int)
此方法同tryAcquire一样,在AQS中不提供实现,只提供签名,在独占的同步器中覆盖实现。需要注意的是,自定义的同步器需要在返回true时保证确实完全释放了资源。
2.2、unparkSuccessor(Node)
用于唤醒等待队列中的下一个线程。
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//cas设置自身的状态为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//取到最近一个有效的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒这个有效的节点
LockSupport.unpark(s.thread);
}
一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,
3、acquireShared(int)
共享模式下获取共享资源的顶层入口。
源码如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared依然需要自定义同步器去实现,负数代表获取失败,0代表获取成功,并且已经没有可用资源,正数代表获取成功,并且还有可用资源。
所以这里的流程是:
1、尝试获取资源,获取成功直接返回
2、获取失败进入等待队列,直到获取资源为止。
3.1、doAcquireShared(int)
将当期线程加入等待队列尾部休息,直到其他线程释放资源并唤醒该线程,直到获取到资源返回。
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 如果获取到资源 就将head指向自己,并且唤醒后继的节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.2、setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //将头结点指向自己
//如果还有剩余量,唤醒下一个
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
4、releaseShared()
共享模式下,线程释放共享资源的方法。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待线程中的其他线程来获取资源。
public final boolean releaseShared(int arg) {
//尝试获取资源
if (tryReleaseShared(arg)) {
//唤醒后继
doReleaseShared();
return true;
}
return false;
}
此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。
4.1、 doReleaseShared()
此方法用来唤醒后继。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒后继。
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
主要是通过其他大牛的技术博客,梳理源码,看懂了走通了在写下来,有很多地方,感觉大牛们已经说得很到位就直接COPY下来了!!