JUC源码解析三:await()方法的层层剥析

版权声明:欢迎转载,转载请标明来源 https://blog.csdn.net/weixin_41973131/article/details/88939362

一、前言

Condition是一个接口类,是一个在JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类。

在JDK中一共有两个类实现了该接口,分别是AbstractQueuedSynchronizerAbstractQueuedLongSynchronizer

下面我们主要要讲的是AbstractQueuedSynchronizer类中对Condition的实现——ConditionObject----其中的await()方法

二、基础构成

Condition内部维护着一个装载线程结点的Condition队列,只有当当前线程获取了独占锁时才能够对Condition的方法进行调用。相对于AQS的Sync队列,Condition队列是一条不支持并发安全的单向 queue

// Condition队列的头结点
private transient Node firstWaiter;

// Condition队列的尾结点
private transient Node lastWaiter;

对于可中断的等待结点,有下面两种模式:

        // 该模式意味着在退出等待时重新中断
        private static final int REINTERRUPT = 1;

        // 该模式意味着在退出等待时抛出InterruptedException
        private static final int THROW_IE = -1;

锁有可以分为独占锁跟共享锁,两种模式各指向两种不同的Node:

         // 标记表示节点正在共享模式中等待
        static final Node SHARED = new Node();
       	
       	// 标记表示节点正在独占模式中等待
        static final Node EXCLUSIVE = null;

结点有三种等待状态:

		// 标志该线程已被取消
        static final int CANCELLED = 1;
        
		// 标志该线程的后继结点需要被唤醒
        static final int SIGNAL = -1;
        
		// 标志该线程正在Condition等待队列
        static final int CONDITION = -2;

		// 标志下一个acquireShared应无条件传播
        static final int PROPAGATE = -3;

三、await()

首先我们先看一下释放锁并将线程挂起的操作:await()

		// 释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁!
        public final void await() throws InterruptedException {
            // 若线程已经被中断则抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();

            // 将当前线程加入Condition锁队列
            Node node = addConditionWaiter();
            // 释放锁并返回状态位
            int savedState = fullyRelease(node);
            int interruptMode = 0;

            // 自旋挂起,判断当前的线程是否在Syn队列中
            // 直到被唤醒或者超时或者CACELLED等
            // 因为上一步已经释放了锁,也就是说此时可能有线程已经获取锁同时可能已经调用了singal方法,如果已经唤醒,那么就不应该park了,而是退出while方法,从而继续争抢锁
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 获得锁,并将自己从Condition队列中释放(此处修改了node结点的状态位,具体的清除由其他线程完成),表明自己不再需要锁。
            // 这里获得锁后的状态位不是THROW_IE就是REINTERRUPT,那么在后面就一定会被抛出异常或者中断线程
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // node结点不是最后一个结点,这个时候就要清除该node结点了,顺手把其他不是Condition状态位的结点也清了
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 抛出InterruptedException,重新中断当前线程,或不执行任何操作,具体取决于模式。
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

操作流程:

  1. 判断线程是否中断,未中断则将当前线程加入Condition队列,并且释放锁将状态位保存起来
  2. 自旋挂起判断当前线程是否在AQS的Syn队列上,若不在则继续挂起,直到被CACELLED或者超时。
  3. AQS获得锁并且将自己从Condition队列中释放
  4. 清空当前结点后面被Cancel的结点,根据中断模式决定如何处理当前线程

3.1 addConditionWaiter()

先从我们第一个看到的addConditionWaiter()开始解析:

		// 在等待队列中添加新的等待对象
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 如果lastWaiter不为空,则检查该队列是否有被Cancel的结点
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 遍历Condition队列的结点,移除已被取消的结点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }

            // 利用当前线程构建一个代表当前线程的结点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 将该结点插入到Condition条件队列的尾部并返回该结点
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }	

操作步骤:

  1. 判断Condition队列的最后一个结点是否为空,若不为空且等待状态不为CONDITION,说明当前结点在Condition队列已被Cancel,此时遍历队列并移除已被取消的结点
  2. 构建当前线程的结点并将其加入到队列尾部

对于unlinkCancelledWaiters():

		// 遍历Condition的条件队列,将状态不为Condition(已被Cancel的节点)移除
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null; // 遍历过程中的最后一个Condition条件队列中的有效结点
            // 从条件队列的第一个结点开始遍历
            while (t != null) {
                Node next = t.nextWaiter;
                // 当前结点的等待状态位不是CONDITION,即该节点已被唤醒,移除该结点
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    // 说明该结点已经是Condition队列的最后一个结点,可以设置lastWaiter的指向了
                    if (next == null)
                        lastWaiter = trail;
                } else
                    trail = t;
                t = next;
            }
        }

操作步骤:

  1. 取出队列的第一个结点
  2. 若当前指向结点不为空,则判断当前结点的状态位
  3. 若当前结点的状态位不为Condition,则说明当前结点的线程已被唤醒,此时将Condition队列的尾部指向此节点的后继结点;若未被唤醒,则将trail指向当前结点的。注意:trail结点以及其之前的结点即是更新后的Condition队列
  4. 若当前结点的后继结点不为null,则将该结点指向其后继结点。跳转到步骤2。

3.2 fullyRelease()

	// 利用当前的状态位释放锁并返回保存的状态位。出错时清除结点
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 保存下来的状态位
            int savedState = getState();
            // 释放当前的锁,s释放失败则抛出异常
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

操作步骤:

  1. 保存状态位
  2. 释放成功则返回状态位,否则抛出异常并将当前线程的等待状态置为CANCELLED(取消)

这里不大明白为什么不这样子写:

	final int fullyRelease(Node node) {
    	int savedState = getState();
        if (release(savedState)) {
        	return savedState;
        } else {
            node.waitStatus = Node.CANCELLED;
        }
    }

如果有知道的大神希望能不吝赐教。

下面继续深入看一下release(Int)操作:

	// 释放持有的锁,用于实现unlock操作
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 成功释放锁,即已将锁的state状态位置为0
            // 看AQS队列中的头结点是否为空并且能否被唤醒,如果可以的话就唤醒继任节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒头结点的后继结点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

其他操作在ReentrantLock释放锁的操作上讲过,因此就不再多说了。

3.3 isOnSyncQueue(Node)

	// 如果一个节点(始终位于条件队列中的节点)现在正在等待重新获取同步队列,则返回true。
    final boolean isOnSyncQueue(Node node) {
        // 此处判断当前Node的状态,如果是CONDITION,表明是正常的没有被唤醒的节点
        // 如果node的前继结点为null,表明是刚刚持有锁的线程
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果node结点既不是在等待队列中,也不是刚刚持有锁的线程,并且有后继结点,那么他肯定在Syn队列中
        if (node.next != null) // 如果有继任者,则必须在队列中
            return true;

        // 遍历整个Node的Syn队列,如果存在返回true
        return findNodeFromTail(node);
    }
`	// 如果节点通过从尾部向后搜索而处于同步队列中,则返回true。仅在isOnSyncQueue需要时调用。
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (; ; ) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

这里可能理解得不大好。。

3.4 checkInterruptWhileWaiting(Node)

		// 检查中断,如果在signalled前被中断返回THROW_IE,如果在signalled后被中断返回REINTERRUPT
        // 若没有被中断则返回0
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }

若未中断则返回0,否则:

	// 如有必要,在取消等待后将节点转移到同步队列。如果线程在被signal()之前被取消,则返回true。
    final boolean transferAfterCancelledWait(Node node) {
        // 如果成功设置结点的状态位,则说明中断发生时,没有signal的调用,因为signal方法会将状态设置为0;
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            // 将当前线程添加到aqs的等待的队列中
            enq(node);
            return true;
        }
        // 上面的代码唯一失败的可能是在此期间,condition被调用了signal方法,导致上面的compareAndSetWaitStatus返回false
        // 如果不在Sync队列中,则让步给其他线程执行,直到当前的node已经被signal方法添加到Sync队列中;
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

四、最后

不知道该写什么结束语。。干脆什么都不写。。

猜你喜欢

转载自blog.csdn.net/weixin_41973131/article/details/88939362