《Java并发编程的艺术》4.AQS下

1、重入锁

1.1概述

重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁支持一个线程对资源的重复加锁。除此之外,该锁还支持获取锁时的公平和非公平性选择。

synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获取该锁。

ReentrantLock虽然没能像synchronized关键字一样支持隐式的重入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法而不被阻塞。

这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁就是公平的,反之就是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获得锁,也就是说锁获取时顺序的,ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。(注:ReentrantLock底层使用的是AQS,公平锁在获取锁(CAS修改state)时,先判断队列里有没有节点,如果有节点就直接去队列排队。而非公平锁上来先抢锁,没抢到才去去队列中排队。)

1.2实现重进入

重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决一下两个问题。

  • 线程再次获取锁。所需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取
  • 锁的最终释放,线程重复n次获取了锁,随后在第n次释放了锁,其他线程能够获取到该锁,锁的最终释放要求对于获取进行计数自增,计数表示当前锁被重复获取的次数,而被锁释放时,计数自减,当计数等于0时表示锁已经成功释放。

ReentrantLock是通过组合自定义同步器来实现锁的获取释放,以非公平锁(默认)的实现为例,获取同步状态的代码如下所示。

  final boolean nonfairTryAcquire(int acquires) {
    
    
            final Thread current = Thread.currentThread();
            int c = getState();
      		//CAS抢占锁
            if (c == 0) {
    
    
                if (compareAndSetState(0, acquires)) {
    
    
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
       		//重入锁逻辑 判断当前线程是否为获取锁的线程,进行重入
            else if (current == getExclusiveOwnerThread()) {
    
    
                int nextc = c + acquires; 
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

else if的逻辑就是重入锁的逻辑,成功获取锁的线程再次获取锁,只是增加了同步状态值这也就要求ReentrantLock在释放同步状态时减少同步状态值

再来看一下ReentrantLock的tryRelease方法 即释放锁的方法

protected final boolean tryRelease(int releases) {
    
    
            int c = getState() - releases; //当前状态减去重入次数
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
    
     //c==0 说明可以释放锁
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free; // c != 0 返回false 表示没有释放成功
        }

如果该锁被获取了n次,那么前(n - 1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否是0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。

2、LockSupport工具

在AQS的队列中,当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作,LockSupport定义了一组公共静态方法,这些方法提供了最基本的线程阻塞唤醒功能,而LockSupport也称为构建同步组件的基础工具。

LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。

扫描二维码关注公众号,回复: 13358371 查看本文章

3、Condition接口

3.1概述

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要把包括wait、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Object的监视器方法与Condition接口的对比

对比项 Object Monitor Methods Condition
前置条件 获取对象的锁 1.调用lock.lock()获得锁 2.调用lock.newCondition()获得Condition对象
调用方式 直接调用 如object.wait() 直接调用 如 condition.await()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当先线程释放锁并进入等待状态,在等待状态不响应中断 不支持 支持(超时特性)
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列的所有线程 支持 支持

3.2Condition接口与示例

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁,Condition对象是由Lock对象(调用Lock对象的new Condition())方法创建出来的,换句话说,Condition是依赖Lock对象的

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void wait() throws Exception{
    
    
       lock.lock(); //必须先加锁
       try{
    
    
           condition.await(); //释放锁 阻塞
       }finally{
    
    
           lock.unlock(); //finally中释放锁
       }
}

public void signal() throws Exception{
    
    
       lock.lock(); //必须现加锁 
       try{
    
    
           condition.signal(); //唤醒一个线程 
       }finally{
    
    
           lock.unlock(); 
       }
}

如实例所示,一般都会将Condition对象作为成员变量,当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

使用Lock+Condition实现的简单阻塞队列

3.3Condition的实现分析

3.3.1概述

Condition是同步器AQS的一个内部类,因为Condition的操作需要相关的锁,所以作为同步器的内部类也比较合理。每个Condition对象都包含着一个队列(称为等待队列),该队列是实现等待/通知功能的关键。

3.3.2等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、被构造成节点加入等待队列并进入等待状态。这里的节点定义复用了AQS中的节点的,也就是说,同步队列和等待队列中节点的类型都是AQS中的静态内部类AbstractQueuedSynchronizer.Node

一个Condition包含一个等待队列,Condition拥有头节点和尾结点,当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如图5-9所示。

在这里插入图片描述

如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾结点nextWaiter指向它,并且更新尾结点即可。上述节点的引用更新过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切的说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下图所示

在这里插入图片描述

如图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能访问到同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

3.3.3等待

调用Condition的await()方法(或者以await()开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态,当从await()方法返回时,当前线程一定获得了Condition相关联的锁

public final void await() throws InterruptedException {
    
    
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程加入等待队列
    Node node = addConditionWaiter(); 
    //释放同步状态,也就是释放锁
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    //循环判断当前节点是否已经在同步队列了
    while (!isOnSyncQueue(node)) {
    
    
        //挂起
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

调用该方法的线程成功获取了锁的进程,也就是同步队列中的头节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程将会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态,如果不是通过其他线程调用Condition.signal()方法唤醒的,而是对等待线程进行中断,则会抛出InterruptedException。

如果从队列的角度去看,当前线程加入Condition的等待队列中,该过程如下

如图所示,同步队列的头结点不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。
在这里插入图片描述

3.3.4通知

调用Condition的singnal()方法,将会唤醒在等待队列中等待时间最长的节点(头结点),在唤醒节点之前,会将节点移到同步队列中

public final void signal(){
    
    
     if(!isHeldExclusively()){
    
     //当前线程必须是获得锁的线程 否则不允许被唤醒
          throw new IllegalMonitorStateException();
     }
    //获取等待队列的头节点
    Node first = firstWaiter;
    if(first != null){
    
    
        //唤醒头结点进入同步队列去抢锁
        doSignal(first);
    }
}

调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程,接着获取等待队列的头结点,将其移动到同步队列并使用LockSupport唤醒节点中的线程

节点从等待队列移动到同步队列的过程如下图所示

在这里插入图片描述

通过调用AQS的enq(Node node)方法,等待队列的头结点线程安全的移动到同步队列,当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程

被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node))方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中

成功获取到同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁

Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signalAll()方法,效果就是将等待队列中的所有节点全部移动到同步队列中,并唤醒每个节点的线程。

猜你喜欢

转载自blog.csdn.net/qq_46312987/article/details/121428711