JUC源码解析一:lock()方法的层层剥析

版权声明:欢迎转载,转载请标明来源 https://blog.csdn.net/weixin_41973131/article/details/88867393

一、前言

本次我们以ReentrantLock类来讲解一下lock()方法的调用。

首先我们要清楚RentrantLock是一个独占可重入锁,并且内部实现了公平锁与非公平锁,那么下面就让我们一步步往下看吧。

二、正文

打开源码我们可以看到,RentrantLock类中,只是实现了Serializable接口,声明了这个类是可序列化的。

而内部类静态抽象类Sync继承了AbstractQueuedSynchronizer,继承了AQS队列的实现

abstract static class Sync extends AbstractQueuedSynchronizer

公平锁与非公平锁则分开继承了Sync

static final class NonfairSync extends Sync
static final class FairSync extends Sync

这里提一下final的作用:在java5.0以后,fianl 可以保证正在创建中的对象不能被其他线程访问到。

想详细了解的同学可以跳转:final关键词在多线程环境中的使用

    final void lock() {
        // 直接获取锁
        acquire(1);
    }

lock()方法中只调用了acquire(1),由于ReentrantLock是可重入锁,这里的参数代表需要重入多少次锁,一般为1次。

进入acquire(int arg)方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

大概的意思是先尝试获取锁tryAcquire(arg),若获取失败则调用addWaiter(Node.EXCLUSIVE)将该线程封装成一个Node结点并将其放置到AQS队列的尾部,之后调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))开始自旋请求锁,如果可能的话挂起线程,直到得到锁,返回当前线程是否中断过。若中断过则调用selfInterrupt()中断该线程。

下面让我们一个一个看下去。

2.1 tryAcquire(int)

首先是公平锁的tryAcquire()

    // 尝试获取锁
    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // getState()返回的是锁重入的次数
                int c = getState();
                // state为0时,说明锁已经被释放,可以尝试获取锁
                if (c == 0) {
                    // 如果当前线程在AQS头部,则尝试将AQS状态state设置为acquires
                    // 若成功则将当前线程设置为AQS独占线程
                    // 相对于非公平锁多个一个判断当前线程是否在队列头的操作,保证了按照请求锁的顺序来决定获取锁的顺序,但同个线程多次获取除外
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果当前锁有其他线程持有并且AQS的独占线程为当前线程
                else if (current == getExclusiveOwnerThread()) {
                    // 将当前状态+1(可重入锁的体现)
                    int nextc = c + acquires;
                    if (nextc < 0)  // 多次重入锁导致溢出
                        throw new Error("Maximum lock count exceeded");
                    // 修改状态位并返回成功
                    setState(nextc);
                    return true;
                }
                // 当前锁有其他线程所有并且独占线程不是当前线程,尝试获取锁失败
                return false;
            }
        }

大概流程:

  1. 如果当前锁有其它线程持有,即c!=0,进行操作2。否则,如果当前线程在AQS队列头部,则尝试将AQS状态state设为acquires,成功后将AQS独占线程设为当前线程返回true,否则进行2。
  2. 判断当前线程与AQS的独占线程是否相同(这里是可重入锁),如果相同,那么就将当前状态位加1,返回true,否则进行3。
  3. 返回false。

由于线程的调度,公平锁在判断的过程中可能出现:
线程A调用tryAcquire失败后,并在调用addWaiter之前,线程B释放了锁,且线程C判断到锁空闲,进入hasQueuedPredecessors返回false(等待队列为空),最终C比A先获取到锁。

hasQueuedPredecessors()方法:

	// 查询是否有任何线程等待获取锁的时间超过当前线程的时间。如果队列不为空,则tryAcquire返回false,线程将进入等待队列(后面的流程和非公平锁一致)
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

如果tail和head不是指向同一节点,并且head的next为空或者head的next的线程不是当前线程,则表示队列不为空。有两种情况会导致h的next为空:
​ 1)当前线程进入hasQueuedPredecessors的同时,另一个线程已经更改了tail(在enq中),但还没有将head的next指向自己,这中情况表明队列不为空;
​ 2)当前线程将head赋予h后,head被另一个线程移出队列,导致h的next为空,这种情况说明锁已经被占用。

使用compareAndSetState(0, acquires)设置状态位

    protected final boolean compareAndSetState(int expect, int update) {
            // 这里调用了Unsafe类的native方法,使用CAS更新状态位
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

所有的CAS更新最终都是调用Unfase类中的native方法,所以之后遇到此类方法就不再详细说了。

2.2 addWaiter(Node)

在上一次尝试获取竞争锁失败后,则应该将包含当前线程的结点加入到AQS等待队列中。

	// 为当前线程和给定模式创建和排队节点
    // mode为结点模式,Node.EXCLUSIVE为独占锁模式,NODE.SHARED为共享锁模式
    // 进行一次CAS更新,失败后再进行循环CAS更新,是一个乐观锁?
    private Node addWaiter(Node mode) {
        // 构建包含当前线程与锁模式的结点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 得到AQS队列的尾部结点
        Node pred = tail;
        // 如果AQS队列存在
        if (pred != null) {
            // 使用CAS将新构建的结点连接到AQS队列的尾部
            // 此处由于node.prev在CAS更新前赋值,因此可靠
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // 此处的pred.next在CAS更新后赋值,在并发情况下不能保证其指向(hasQueuedPredecessors()处有体现)
                // 因此在操作的时候一般是使用prev进行操作
                pred.next = node;
                return node;
            }
        }
        // 在AQS队列为空或者结点更新失败时调用enq进行循环CAS更新
        enq(node);
        return node;
    }

进入ena(Node)

	// 将节点入队,必要时进行初始化。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 需要初始化AQS队列
            if (t == null) { // Must initialize
                // 进行初始化并将结点设置为头结点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // CAS更新,跟上一级的CAS更新的区别是外围多了loop
                // 同样,保证了prev结点是可靠的
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

经历这个操作时候便实现了将竞争锁失败的结点入队。

2.3 acquireQueued(Node, Int)

在上面的操作中,本线程尝试竞争锁失败,并且包含本线程的结点被加入到AQS队列中。

在下面的acquireQueued(final Node node, int arg)操作中将会自旋请求锁:

	// 自旋请求锁,如果可能的话挂起线程,直到得到锁,返回当前线程是否中断过(如果park()过并且中断过的话有一个interrupted中断位)。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 得到node结点的上一个结点
                final Node p = node.predecessor();
                // 如果node结点的上一个结点为头结点,则可以尝试获取锁
                // 这里不会和等待队列中其它线程发生竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争。
                // 判断p == head是为了防止当前线程是因为“线程被中断”而唤醒
                if (p == head && tryAcquire(arg)) {
                    // 成功获取锁后则将头结点设置为当前结点,返回中断位
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果不是head直接后继或获取锁失败,则检查是否要阻塞当前线程,是则阻塞当前线程
                // 直到该线程的前继结点锁对应的线程唤醒该线程才会继续执行
                // shouldParkAfterFailedAcquire:判断“当前线程”是否需要阻塞
                // parkAndCheckInterrupt:阻塞当前线程
                // 如果线程曾经中断过(或者阻塞过)(比如手动interrupt()或者超时等等,那么就再中断一次,中断两次的意思就是清除中断位)。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 竞争锁失败,取消竞争锁,使node结点出队,一般是抛出异常导致循环终止
            if (failed)
                cancelAcquire(node);
        }
    }

node.predecessor():

		// 返回前面的结点,若前一个结点不存在则抛出NPE异常
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }	

setHead(Node node)

    // 将当前结点置为头结点并将前继结点跟对应线程置null,方便GC
	private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }

shouldParkAfterFailedAcquire(Node pred, Node node)

	// 检查并更新获取锁失败的结点的状态
    // 若该线程应该堵塞则返回true
	// CANCELLED = 1:线程已被取消;
	// SIGNAL = -1:当前线程的后继线程需要被unpark(唤醒);
	// CONDITION = -2 :线程(处在Condition休眠状态)在等待Condition唤醒;
	// PROPAGATE = –3 :其它线程获取到“共享锁”.
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 得到前一个结点的等待状态waitStatus
        int ws = pred.waitStatus;

        // 该线程在争夺锁失败后应该堵塞
        if (ws == Node.SIGNAL)
            return true;
        // 前一个结点的等待状态waitStatus>0,即是前一个结点被CANCELLED了,那么就将前一个结点去掉
        // 递归该操作直到前一个结点的waitStatus<=0
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 前一个结点的等待状态waitStatus <= 0, 修改前一个结点状态位为SINGAL,表示后面有结点等待处理
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 表示线程不应该park()
        return false;
    }

parkAndCheckInterrupt()

	// park()该线程并且返回其是否被中断过
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

cancelAcquire(Node)

    // 使node结点出队
    private void cancelAcquire(Node node) {
        // 如果node不存在则无视
        if (node == null)
            return;

        // 将node的线程置为null,方便GC
        node.thread = null;

        // 跳过被cancel的前继node,找到一个有效的前继节点pred
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        // 将该结点的等待状态设置为CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 如果node是tail,更新tail为pred,并使pred.next指向null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            // 如果node既不是tail,又不是head的后继节点
            // 若node的前继节点的waitStatus不是SIGNAL并且<=0,则将其置为SIGNAL,意思是node结点的前继结点的后继结点(不是node了)需要被唤醒
            // 并使node的前继节点指向node的后继节点
            // pred.thread != null为什么要判断这个。。还不大清楚
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 如果node是head的后继节点,则直接唤醒node的后继节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }

细心的同学可能会发现当node既不是tail,又不是head的后继节点时,仅仅将node结点的前继结点的next指向了node结点的后继结点,而没有讲node结点的后继结点的pred指向node结点的前继结点。

其实在后面unparkSuccessor(node)唤醒node的后继结点时。唤醒了之前被阻塞的结点(在之前我们有说过阻塞结点的操作)。当别的线程在调用cancelAcquire()或者shouldParkAfterFailedAcquire()时,会根据prev指针跳过被cancel掉的前继节点,同时,会调整其遍历过的prev指针。代码类似这样:

    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

想要详细了解的同学可以看下Java AbstractQueuedSynchronizer源码阅读3-cancelAcquire()

三 与非公平锁的区别

我们前面讲的是公平锁加锁的操作,下面讲讲在加锁操作中非公平锁与公平锁的区别。

在一开始的lock()方法中:

		final void lock() {
            // 尝试直接争夺锁,若失败则正常获取
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

这里跟公平锁的区别就是尝试直接争夺锁,只有失败的时候才会正常获取。

acquire(1)

这里的acquire(Int)跟公平锁一样是都调用父类AbstractQueuedSynchronizer的方法,但是两者都覆盖了其中的tryAcquire(Int)方法。下面让我们看一看非公平锁的tryAcquire(Int)方法。

3.1 tryAcquire(Int)

		protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
		final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

看上去好像跟公平锁的操作一模一样,但是仔细看还是有一点区别的,就是当c == 0时,这里并没有判断当前线程是否在AQS头部,这也说明了非公平锁是不分申请锁的前后顺序的。

这也也导致了非公平锁比较简单而且性能比公平锁高很多。

在不要求线程先后执行顺序的情况下,首选非公平锁。

四 最后

如果有误的地方请大家指出,一起学习。

猜你喜欢

转载自blog.csdn.net/weixin_41973131/article/details/88867393