聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)

上一篇介绍了AQS的基本设计思路以及两个内部类Node和ConditionObject的实现 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一) 这篇说一说AQS的主要方法的实现。AQS和CLHLock的最大区别是,CLHLock是自旋锁,而AQS使用Unsafe的park操作让线程进入等待(阻塞)。

线程加入同步队列,和CLHLock一样,从队尾入队列,使用CAS+轮询的方式实现无锁化。入队列后设置节点的prev和next引用,形成双向链表的结构

 
  1. private Node enq(final Node node) {

  2. for (;;) {

  3. Node t = tail;

  4. if (t == null) { // Must initialize

  5. if (compareAndSetHead(new Node()))

  6. tail = head;

  7. } else {

  8. node.prev = t;

  9. if (compareAndSetTail(t, node)) {

  10. t.next = node;

  11. return t;

  12. }

  13. }

  14. }

  15. }


线程指定独享还是共享方式加入队列,先尝试加入一次,如果失败再用enq()轮询地尝试,比如addWaiter(Node.EXCLUSIVE), addWaiter(Node.SHARED)

 
  1. private Node addWaiter(Node mode) {

  2. Node node = new Node(Thread.currentThread(), mode);

  3. // Try the fast path of enq; backup to full enq on failure

  4. Node pred = tail;

  5. if (pred != null) {

  6. node.prev = pred;

  7. if (compareAndSetTail(pred, node)) {

  8. pred.next = node;

  9. return node;

  10. }

  11. }

  12. enq(node);

  13. return node;

  14. }

唤醒后继节点,最典型的情况就是在线程释放锁后,会唤醒后继节点。会从节点的next开始,找到一个后继节点,如果next是null,就从队尾开始往head找,直到找到最靠近当前节点的后续节点。 waitStatus <= 0的隐含意思是线程没有被取消。 然后用LockSupport唤醒这个找到的后继节点的线程。

这个方法类似于CLHLock里面释放锁时,通知后续节点来获取锁。AQS使用了阻塞的方式,所以这个方法的后续方法是acquireXXX方法,它负责将后续节点唤醒,后续节点再根据状态去判断是否获得锁

 
  1. private void unparkSuccessor(Node node) {

  2. /*

  3. * If status is negative (i.e., possibly needing signal) try

  4. * to clear in anticipation of signalling. It is OK if this

  5. * fails or if status is changed by waiting thread.

  6. */

  7. int ws = node.waitStatus;

  8. if (ws < 0)

  9. compareAndSetWaitStatus(node, ws, 0);

  10.  
  11. /*

  12. * Thread to unpark is held in successor, which is normally

  13. * just the next node. But if cancelled or apparently null,

  14. * traverse backwards from tail to find the actual

  15. * non-cancelled successor.

  16. */

  17. Node s = node.next;

  18. if (s == null || s.waitStatus > 0) {

  19. s = null;

  20. for (Node t = tail; t != null && t != node; t = t.prev)

  21. if (t.waitStatus <= 0)

  22. s = t;

  23. }

  24. if (s != null)

  25. LockSupport.unpark(s.thread);

  26. }


共享模式下的释放操作,从队首开始向队尾扩散,如果节点的waitStatu是SIGNAL,就唤醒后继节点,如果waitStatus是0,就设置标记成PROPAGATE

 
  1. private void doReleaseShared() {

  2. for (;;) {

  3. Node h = head;

  4. if (h != null && h != tail) {

  5. int ws = h.waitStatus;

  6. if (ws == Node.SIGNAL) {

  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  8. continue; // loop to recheck cases

  9. unparkSuccessor(h);

  10. }

  11. else if (ws == 0 &&

  12. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  13. continue; // loop on failed CAS

  14. }

  15. if (h == head) // loop if head changed

  16. break;

  17. }

  18. }


取消获取操作,要把节点从同步队列中去除,通过链表操作将它的前置节点的next指向它的后继节点集合。如果该节点是在队尾,直接删除即可,否则要通知后继节点去获取锁

 
  1. private void cancelAcquire(Node node) {

  2. // Ignore if node doesn't exist

  3. if (node == null)

  4. return;

  5.  
  6. node.thread = null;

  7.  
  8. // Skip cancelled predecessors

  9. Node pred = node.prev;

  10. while (pred.waitStatus > 0)

  11. node.prev = pred = pred.prev;

  12.  
  13. // predNext is the apparent node to unsplice. CASes below will

  14. // fail if not, in which case, we lost race vs another cancel

  15. // or signal, so no further action is necessary.

  16. Node predNext = pred.next;

  17.  
  18. // Can use unconditional write instead of CAS here.

  19. // After this atomic step, other Nodes can skip past us.

  20. // Before, we are free of interference from other threads.

  21. node.waitStatus = Node.CANCELLED;

  22.  
  23. // If we are the tail, remove ourselves.

  24. if (node == tail && compareAndSetTail(node, pred)) {

  25. compareAndSetNext(pred, predNext, null);

  26. } else {

  27. // If successor needs signal, try to set pred's next-link

  28. // so it will get one. Otherwise wake it up to propagate.

  29. int ws;

  30. if (pred != head &&

  31. ((ws = pred.waitStatus) == Node.SIGNAL ||

  32. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

  33. pred.thread != null) {

  34. Node next = node.next;

  35. if (next != null && next.waitStatus <= 0)

  36. compareAndSetNext(pred, predNext, next);

  37. } else {

  38. unparkSuccessor(node);

  39. }

  40.  
  41. node.next = node; // help GC

  42. }

  43. }

独占模式并且不可中断地获取队列锁的操作,这个方法在ConditionObject.await()中被使用,当线程被Unsafe.unpark唤醒后,需要调用acquireQueued来获取锁,从而结束await(). accquireQueued()方法要么获得锁,要么被tryAcquire方法抛出的异常打断,如果抛出异常,最后在finally里面取消获取

值得注意的是只有节点的前驱节点是head的时候,才能获得锁。这里隐含了一个意思,就是head指向当前获得锁的节点。当程序进入if(p == head and tryAcquire(arg))这个分支时,表示线程获得了锁或者被中断,将自己设置为head,将next设置为null.

shouldParkAfterFailedAcquired()方法的目的是将节点的前驱节点的waitStatus设置为SIGNAL,表示会通知后续节点,这样后续节点才能放心去park,而不用担心被丢失唤醒的通知。

parkAndCheckInterupt()方法会真正执行阻塞,并返回中断状态,这个方法有两种情况返回,一种是park被unpark唤醒,这时候中断状态为false。另一种情况是park被中断了,由于这个accquireQueued方法是不可中断的版本,所以即使线程被中断了,也只是设置了中断标志为true,没有跑出中断异常。在支持中断的获取版本里,这时会抛出中断异常。

这个方法可以理解为Lock的lock里没有获取锁的分支,在CLHLock自旋锁的实现里,是对前驱节点的状态自旋,而AQS是阻塞,所以这里是在同步队列里面进入了阻塞状态,等待被前驱节点释放锁时唤醒。

释放锁时会根据状态调用unparkSuccessor()方法来唤醒后续节点,这样就会在这个方法里面把阻塞的线程唤醒并获得锁。

队列锁的好处是线程都在多个共享状态上自旋或阻塞,所以unparkSuccessor()方法只会唤醒它后继没有取消的节点。

而取消只有两种情况,中断或者超时

 
  1. final boolean acquireQueued(final Node node, int arg) {

  2. boolean failed = true;

  3. try {

  4. boolean interrupted = false;

  5. for (;;) {

  6. final Node p = node.predecessor();

  7. if (p == head && tryAcquire(arg)) {

  8. setHead(node);

  9. p.next = null; // help GC

  10. failed = false;

  11. return interrupted;

  12. }

  13. if (shouldParkAfterFailedAcquire(p, node) &&

  14. parkAndCheckInterrupt())

  15. interrupted = true;

  16. }

  17. } finally {

  18. if (failed)

  19. cancelAcquire(node);

  20. }

  21. }


独占模式支持中断的获取队列锁操作,可以看到和不支持中断版本的区别,这里如果parkAndCheckInterrupt()方法返回时显示被中断了,就抛出中断异常

 
  1. private void doAcquireInterruptibly(int arg)

  2. throws InterruptedException {

  3. final Node node = addWaiter(Node.EXCLUSIVE);

  4. boolean failed = true;

  5. try {

  6. for (;;) {

  7. final Node p = node.predecessor();

  8. if (p == head && tryAcquire(arg)) {

  9. setHead(node);

  10. p.next = null; // help GC

  11. failed = false;

  12. return;

  13. }

  14. if (shouldParkAfterFailedAcquire(p, node) &&

  15. parkAndCheckInterrupt())

  16. throw new InterruptedException();

  17. }

  18. } finally {

  19. if (failed)

  20. cancelAcquire(node);

  21. }

  22. }

独占模式限时获取队列锁操作, 这个获取的整体逻辑和前面的类似,区别是它支持限时操作,如果等待时间大于spinForTimeoutThreshold,就使用阻塞的方式等待,否则用自旋等待。使用了LockSupport.parkNanos()方法来实现限时地等待,并支持中断

这里隐含的一个含义是parkNanos方法退出有3种方式,

1. 限时到了自动退出,这时候会超时

2. 没有到限时被唤醒了,这时候是不超时的

3. 被中断

 
  1. private boolean doAcquireNanos(int arg, long nanosTimeout)

  2. throws InterruptedException {

  3. long lastTime = System.nanoTime();

  4. final Node node = addWaiter(Node.EXCLUSIVE);

  5. boolean failed = true;

  6. try {

  7. for (;;) {

  8. final Node p = node.predecessor();

  9. if (p == head && tryAcquire(arg)) {

  10. setHead(node);

  11. p.next = null; // help GC

  12. failed = false;

  13. return true;

  14. }

  15. if (nanosTimeout <= 0)

  16. return false;

  17. if (shouldParkAfterFailedAcquire(p, node) &&

  18. nanosTimeout > spinForTimeoutThreshold)

  19. LockSupport.parkNanos(this, nanosTimeout);

  20. long now = System.nanoTime();

  21. nanosTimeout -= now - lastTime;

  22. lastTime = now;

  23. if (Thread.interrupted())

  24. throw new InterruptedException();

  25. }

  26. } finally {

  27. if (failed)

  28. cancelAcquire(node);

  29. }

  30. }

共享模式获得队列锁操作,获得操作也是从head的下一个节点开始,和独占模式只unparkSuccessor一个节点不同,共享模式下,等head的后续节点被唤醒了,它要扩散这种共享的获取,使用setHeadAndPropagate操作,把自己设置为head,并且把释放的状态往下传递,这里采用了链式唤醒的方法,1个节点负责唤醒1个后续节点,直到不能唤醒。当后继节点是共享模式isShared,就调用doReleaseShared来唤醒后继节点

doReleaseShared会从head开始往后检查状态,如果节点是SIGNAL状态,就唤醒它的后继节点。如果是0就标记为PROPAGATE, 等它释放锁的时候会再次唤醒后继节点。

这里有个隐含的意思:

1. 加入同步队列并阻塞的节点,它的前驱节点只会是SIGNAL,表示前驱节点释放锁时,后继节点会被唤醒。shouldParkAfterFailedAcquire()方法保证了这点,如果前驱节点不是SIGNAL,它会把它修改成SIGNAL。这里不是SIGNAL就有可能是PROPAGATE

2. 造成前驱节点是PROPAGATE的情况是前驱节点获得锁时,会唤醒一次后继节点,但这时候后继节点还没有加入到同步队列,所以暂时把节点状态设置为PROPAGATE,当后继节点加入同步队列后,会把PROPAGATE设置为SIGNAL,这样前驱节点释放锁时会再次doReleaseShared,这时候它的状态已经是SIGNAL了,就可以唤醒后续节点了

 
  1. private void doAcquireShared(int arg) {

  2. final Node node = addWaiter(Node.SHARED);

  3. boolean failed = true;

  4. try {

  5. boolean interrupted = false;

  6. for (;;) {

  7. final Node p = node.predecessor();

  8. if (p == head) {

  9. int r = tryAcquireShared(arg);

  10. if (r >= 0) {

  11. setHeadAndPropagate(node, r);

  12. p.next = null; // help GC

  13. if (interrupted)

  14. selfInterrupt();

  15. failed = false;

  16. return;

  17. }

  18. }

  19. if (shouldParkAfterFailedAcquire(p, node) &&

  20. parkAndCheckInterrupt())

  21. interrupted = true;

  22. }

  23. } finally {

  24. if (failed)

  25. cancelAcquire(node);

  26. }

  27. }

  28.  
  29. private void setHeadAndPropagate(Node node, int propagate) {

  30.         Node h = head; // Record old head for check below

  31.         setHead(node);

  32.         /*

  33.          * Try to signal next queued node if:

  34.          *   Propagation was indicated by caller,

  35.          *     or was recorded (as h.waitStatus) by a previous operation

  36.          *     (note: this uses sign-check of waitStatus because

  37.          *      PROPAGATE status may transition to SIGNAL.)

  38.          * and

  39.          *   The next node is waiting in shared mode,

  40.          *     or we don't know, because it appears null

  41.          *

  42.          * The conservatism in both of these checks may cause

  43.          * unnecessary wake-ups, but only when there are multiple

  44.          * racing acquires/releases, so most need signals now or soon

  45.          * anyway.

  46.          */

  47.         if (propagate > 0 || h == null || h.waitStatus < 0) {

  48.             Node s = node.next;

  49.             if (s == null || s.isShared())

  50.                 doReleaseShared();

  51.         }

  52.     }

  53.  
  54. private void doReleaseShared() {

  55.         for (;;) {

  56.             Node h = head;

  57.             if (h != null && h != tail) {

  58.                 int ws = h.waitStatus;

  59.                 if (ws == Node.SIGNAL) {

  60.                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  61.                         continue;            // loop to recheck cases

  62.                     unparkSuccessor(h);

  63.                 }

  64.                 else if (ws == 0 &&

  65.                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  66.                     continue;                // loop on failed CAS

  67.             }

  68.             if (h == head)                   // loop if head changed

  69.                 break;

  70.         }

  71.     }


tryXXXX 方法,这几个方法是给子类重写的,用来扩展响应的同步器操作

 
  1. protected boolean tryAcquire(int arg) {

  2. throw new UnsupportedOperationException();

  3. }

  4.  
  5. protected boolean tryRelease(int arg) {

  6.         throw new UnsupportedOperationException();

  7.     }

  8.  
  9. protected int tryAcquireShared(int arg) {

  10.         throw new UnsupportedOperationException();

  11.     }

  12.  
  13. protected boolean tryReleaseShared(int arg) {

  14.         throw new UnsupportedOperationException();

  15.     }


独占模式获取操作的顶层方法,如果没有tryAcquired,或者没有获得队列锁,就中断

 
  1. public final void acquire(int arg) {

  2. if (!tryAcquire(arg) &&

  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  4. selfInterrupt();

  5. }


独占模式释放操作的顶层方法,如果tryRelease()成功,那么就唤醒后继节点去获取锁

 
  1. public final boolean release(int arg) {

  2. if (tryRelease(arg)) {

  3. Node h = head;

  4. if (h != null && h.waitStatus != 0)

  5. unparkSuccessor(h);

  6. return true;

  7. }

  8. return false;

  9. }

猜你喜欢

转载自blog.csdn.net/hellozhxy/article/details/82772081