AQS的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法。AQS提供的模板方法基本上分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况,自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。下面我们主要分析独占式同步状态的获取与释放操作。
独占式
独占式,即同一时刻仅有一个线程持有同步状态。
独占式同步状态获取
通过调用同步器的acquire(int)方法可以获取同步状态,但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。acquire(int)方法源码如下:
/** * 以独占模式获取同步状态,且忽略中断 * 通常至少调用一次tryAcquire操作来实现此方法 * 获取同步状态失败后线程入队,然后是阻塞和非阻塞状态之间的连续变换,直到调用tryAcquire(int)方法成功为止 * 该方法可用于实现方法Lock.lock() */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(int)方法是需要子类实现的一个方法,如果该方法返回的是true,则表示当前线程成功获得了锁,否则,当前线程没有获得锁,然后执行后续代码,后续代码由两步构成:
- addWaiter(Node),将当前线程以独占模式封装成一个节点,添加到同步队列中;
- acquireQueued(Node, int),当前线程所在节点目前处于同步中,但此时线程还没有阻塞,当前线程会继续尝试获取锁。
acquireQueued(Node, int)方法源码如下:
// 用于独占模式下,线程已经在队列中的acquire操作 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 线程中断标志 boolean interrupted = false; for (;;) { // 获取node节点的前驱节点 final Node p = node.predecessor(); // 如果node的前驱节点是头结点,且同步状态获取成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 同步状态获取失败 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued(Node, int)方法的核心代码都在一个for(;;)死循环中,这其实是一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从自旋过程退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)。
从上述代码中看到,只有前驱节点是头结点才能够尝试获取同步状态,这时为什么呢?原因有两个:
- 头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己的前驱节点是否为头结点;
- 维护同步队列的FIFO原则。
若同步状态获取失败,acquireQueued(Node, int)方法会调用shouldParkAfterFailedAcquire(Node, Node)方法,该方法源代码如下:
/** * 检查并更新获取同步状态失败的节点状态值 * 如果线程应该被阻塞则返回true * 该方法要求参数pred是node的前驱节点 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点的状态值 int ws = pred.waitStatus; // 如果前驱节点状态值为SIGNAL,那当前线程应该被阻塞 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { // 前驱节点状态值为CANCELLED,则跳过前驱节点,且重复查向前查找未被撤销的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 找到距离最近的非撤销前驱节点,设置该节点的后继为当前节点 pred.next = node; } else { // waitStatus值必为0或者PROPAGATE,但当前节点需要一个SIGNAL信号,此时还不能阻塞线程 // 将前驱节点的状态值通过CAS操作更新为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若shouldParkAfterFailedAcquire(Node, Node)方法返回值为false,但是外部的acquireQueued(Node, int)方法为死循环,所以acquireQueued(Node, int)方法会一直检查更新节点的状态值,直到当前节点的前驱节点状态值为SIGNAL,这是AQS约定的,只有前继节点的waitStatus是SIGNAL,当前节点才可以安心的去阻塞。因为前继节点的waitStatus是SIGNAL,就相当于当前节点告诉了它的前继节点,我将要去阻塞了,到时候请唤醒我。此时,shouldParkAfterFailedAcquire(Node, Node)方法返回值为true,然后接着调用parkAndCheckInterrupt()方法,该方法很简单:
// 阻塞线程,检查线程的中断状态并返回检查结果 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
至此,acquire(int)方法的整体调用过程都已经介绍完毕,在acquire(int)方法中,节点自旋获取同步状态的行为如下图所示:
节点之间在循环检查的过程中基本不通信,而是简单地判断自己的前驱结点是否为头节点,这样就使得节点的释放符合FIFO,并且也便于对过早通知的处理(指前驱节点不是头节点的线程由于中断而被唤醒)。
独占式同步状态获取流程,即acquire(int)方法的调用流程如下图所示:
独占式同步状态获取响应中断
前面介绍的acquire(int)方法,以独占不响应中断的方式获取同步状态,对线程进行中断操作后,该线程会依然位于同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。该方法源码如下:
/** * 独占式同步状态获取,响应中断 * 首先检查中断状态值,然后至少调用一次tryAcquire(int)方法,若同步状态获取成功,则直接返回 * 否则,线程入队,然后是阻塞和非阻塞的状态之间的连续变换,直到tryAcquire(int)方法调用成功为止 * 该方法可用于实现方法Lock.lockInterruptibly() */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
首先检查该线程是否已经中断了,如果是则抛出InterruptedException异常,否则执行tryAcquire(int)方法获取同步状态,如果获取成功,则直接返回,否则执行doAcquireInterruptibly(int)方法:
// 独占可中断模式下的同步状态获取操作 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireInterruptibly(int)方法与acquire(int)方法核心业务代码相同,只有如下两个差别:
- 方法声明抛出InterruptedException异常;
- 在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。
独占式同步状态超时获取
通过调用同步器的tryAcquireNanos(int, long)可以超时获取同步状态,该方法为acquireInterruptibly(int)方法的增强版,它既能响应中断,也能超时控制。若在指定的时间段内获取到同步状态则返回true,否则返回false。该方法源码如下:
/** * 独占式同步状态获取,可响应中断,并具有超时控制 * 首先检查中断状态,然后至少调用一次tryAcquire(int)方法,若同步状态获取成功,则返回true * 否则,线程入队,然后是阻塞和非阻塞的状态之间的连续变换,这种状态交替结束于:同步状态获取成功、被中断、超时这三种情况 * 该方法可用于实现方法Lock.tryLock(long, TimeUnit) */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
tryAcquireNanos(int, long)方法超时获取同步状态最终是在doAcquireNanos(int, long)中实现的,代码如下:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 如果超时时间<=0,则同步状态获取失败 if (nanosTimeout <= 0L) return false; // 获取同步状态的截止时间 final long deadline = System.nanoTime() + nanosTimeout; // 生成当前线程的Node节点,并添加到同步队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋获取同步状态 for (;;) { final Node p = node.predecessor(); // 获取同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 重新计算超时时间间隔 nanosTimeout = deadline - System.nanoTime(); // 已经超时,返回false if (nanosTimeout <= 0L) return false; // 继续等待 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 检查中断标志 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
针对超时控制,程序首先计算超时的截止时间点deadline,deadline = System.nanoTime() + nanosTimeout。如果获取同步状态失败,则需要重新计算休眠的时间间隔nanosTimeout = deadline - System.nanoTime(),如果nanosTimeout <= 0 表示已经超时了,返回false,如果nanosTimeout > spinForTimeoutThreshold(1000纳秒),使当前线程等待nanosTimeout纳秒,否则,如果nanosTimeout <= spinForTimeoutThreshold,那该线程将不会超时等待,而是进入快速的自旋过程。原因在于:非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的情况下,同步器会进入无条件的快速自旋。
独占式同步状态超时获取的流程图如下所示:
独占式同步状态释放
当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点,进而使其后继节点重新尝试获取同步状态,该方法代码如下所示:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
该方法执行时,若tryRelease(int)方法返回true,则会通过unparkSuccessor(Node)方法唤醒头节点的后继节点线程,该方法最终调用了LockSupport.unpark(Thread)方法来唤醒处于等待状态线程,这一部分后面再详细介绍。
总结
在获取同步状态时,同步器维护一个同步队列,获取同步状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头结点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int)方法释放同步状态,然后唤醒头结点的后继节点。
相关博客
AbstractQueuedSynchronizer同步队列详解
AbstractQueuedSynchronizer共享式同步状态获取与释放
参考资料
方腾飞:《Java并发编程的艺术》