上面几章节介绍了读写锁和Sync抽象类,这节介绍AQS如何控制同步的
1.首先看一下类的定义
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
官方对类的描述为:提供了一个框架实现阻塞锁,依赖于同步器(信号量、事件等)以及FIFO等待队列。该类为大多数同步器提供了一个有用的基础。
2.变量
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head;//等待队列的头 /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail;//等待队列的尾 /** * The synchronization state. */ private volatile int state;//同步状态,锁个数
3.看一下Node节点的内容
static final class Node { /** 标记节点在共享模式中等待 */ static final Node SHARED = new Node(); /** 标记节点在独占模式中等待 */ static final Node EXCLUSIVE = null; /** 等待状态,线程被取消 */ static final int CANCELLED = 1; /** 等待状态,通知。当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的。 */ static final int SIGNAL = -1; /** 等待状态的枚举,表示线程正在等待 */ static final int CONDITION = -2; /** * 等待状态的枚举值,表示下一个acquireShared应该无条件的传播,用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的 */ static final int PROPAGATE = -3; /** * 等待状态 */ volatile int waitStatus; /** * 前一个节点 */ volatile Node prev; /** * 下一个节点 */ volatile Node next; /** * 线程 */ volatile Thread thread; /** * 下一个等待者 */ Node nextWaiter; /** * 是否处于共享模式 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前一个节点,如果为空,则抛空指针异常 * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
4.acquireShared,获取读锁时用到
public final void acquireShared(int arg) {//在共享模式中获取锁 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared方法见Sync的实现,如果返回<0,说明获取锁失败,会调用doAcquireShared方法
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//等待队列中增加一个waiter boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {//获取锁成功 setHeadAndPropagate(node, r);//设置head节点且唤醒下一个线程或者传播状态 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//获取失败,则检查并更新状态 interrupted = true; } } finally { if (failed) cancelAcquire(node);//取消获取锁 } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node);//设置head /* * 唤醒队列中的下一个节点 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
private void doReleaseShared() { /* * */ for (;;) { Node h = head; if (h != null && h != tail) {//存在后续线程 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);//唤醒 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
5.tryReadLock
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false;//如果其他线程拥有写锁,则返回false int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded");//如果读锁的个数超过最大,则报异常 if (compareAndSetState(c, c + SHARED_UNIT)) {//如果设置状态成功 if (r == 0) {//如果读锁为0,设置第一个获取到读锁的线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) {//同一个线程,则读锁的个数加1 firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get();//从threadLocal中获取锁个数 else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
总结一下AQS实现的大致思路:
AQS内部维护了一个队列管理锁。
线程会首先尝试获取锁,如果获取失败,则将当前线程以及等待状态包装成Node节点增加到等待队列中。
扫描二维码关注公众号,回复:
1559451 查看本文章
接着不断尝试获取锁,如果失败,则阻塞自己,直到被唤醒。
当持有锁的线程释放锁时,会唤醒队列中的后续线程。