part10-AbstractQueuedSynchronizer

目录页:https://blog.csdn.net/u011294519/article/details/88367808

1.大声哔哔

    前面在学习并发工具类的时候总有一个绕不开的类AbstractQueuedSynchronizer,怀着崇敬的心情,我决定开始爬这座山峰,首先看到的是@author Doug Lea,向大师致敬。

    说到AbstractQueuedSynchronizer(AQS),首先需要知道的是AQS中大量运用了模板方法这种设计模式。简单说下模板方法这种设计模式,就是在父类的一个方法中定义了执行的步骤,但是每个步骤具体的执行内容和实现交由子类完成,调用的时候直接调用父类定义好的方法,按照步骤执行子类的实现内容就行。

    AQS中用到模板方法设计模式的方法如下:

独占式获取锁

acquire

acquireInterruptibly

tryAcquireNanos

共享式获取锁

acquireShared

acquireSharedInterruptibly

tryAcquireSharedNanos

独占式释放锁

release

共享式释放锁

releaseShared

    需要子类覆盖的流程方法

    需要子类覆盖的流程方法

        1.独占式获取  tryAcquire

        2.独占式释放  tryRelease

        3.共享式获取 tryAcquireShared

        4.共享式释放  tryReleaseShared

        5.这个同步器是否处于独占模式  isHeldExclusively

2.上代码

2.1. 非共享锁

    下面使用AbstractQueuedSynchronizer实现一个自己的锁。

package com.concurrent.aqslock.part10.aqs;

 

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

 

/**

 * 类说明:实现一个自己的锁

 */

public class SelfLock implements Lock {

 

    /**

     * state 表示获取到锁 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到

     */

    private static class Sync extends AbstractQueuedSynchronizer {

 

        /**

         * 当前锁是否被占用

         */

        @Override

        protected boolean isHeldExclusively() {

            return getState() == 1;

        }

 

        /**

         * 独占式获取锁

         */

        @Override

        protected boolean tryAcquire(int arg) {

            // 使用CAS设置state设置当前拥有锁的线程

            if (compareAndSetState(0, 1)) {

                setExclusiveOwnerThread(Thread.currentThread());

                return true;

            }

            return false;

        }

 

        @Override

        protected boolean tryRelease(int arg) {

            // 如果state已经为0则不需释放

            if (getState() == 0) {

                throw new UnsupportedOperationException();

            }

            setExclusiveOwnerThread(null);

            // 这里不使用CAS的原因是这个是独占锁,释放锁的时候不会出现并发问题

            setState(0);

            return true;

        }

 

        Condition newCondition() {

            return new ConditionObject();

        }

    }

 

    private final Sync sycn = new Sync();

 

    @Override

    public void lock() {

        sycn.acquire(1);

 

    }

 

    @Override

    public void lockInterruptibly() throws InterruptedException {

        sycn.acquireInterruptibly(1);

    }

 

    @Override

    public boolean tryLock() {

        return sycn.tryAcquire(1);

    }

 

    @Override

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        return sycn.tryAcquireNanos(1, unit.toNanos(time));

    }

 

    @Override

    public void unlock() {

        sycn.release(1);

 

    }

 

    @Override

    public Condition newCondition() {

        return sycn.newCondition();

    }

 

    /**

     * 简单的测试线程,打印出获取到锁的线程名称,并sleep2秒

     */

    private static class TestMyLock implements Runnable {

        Lock lock;

 

        public TestMyLock(Lock selfLock) {

            lock = selfLock;

        }

 

        @Override

        public void run() {

            lock.lock();

            try {

                System.out.println(Thread.currentThread().getName() + " get the lock and will sleep 2 second");

                Thread.sleep(2000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            } finally {

                lock.unlock();

            }

        }

    }

 

    public static void main(String[] arg0) {

        Lock selfLock = new SelfLock();

        for (int i = 0; i < 3; ++i) {

            new Thread(new TestMyLock(selfLock), "Test self Lock Thread: " + i).start();

        }

 

        Lock reentrantLock = new ReentrantLock();

        for (int i = 0; i < 3; ++i) {

            new Thread(new TestMyLock(reentrantLock), "ReentrantLock Thread: " + i).start();

        }

    }

 

}

代码位置:aqs-lock的part10

运行结果:

2.2. 共享锁

package com.concurrent.aqslock.part10.aqs;

 

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

 

/**

 *类说明:共享同步工具类

 */

public class ShareLock implements Lock {

 

    //为3表示允许三个线程同时获得锁

    private final Sync sync = new Sync(3);

 

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {

            if (count <= 0) {

                throw new IllegalArgumentException("count must large than zero.");

            }

            setState(count);

        }

 

        @Override

        public int tryAcquireShared(int reduceCount) {

            for (;;) {

                int current = getState();

                int newCount = current - reduceCount;

                if (newCount < 0 || compareAndSetState(current, newCount)) {

                    return newCount;

                }

            }

        }

 

        @Override

        public boolean tryReleaseShared(int returnCount) {

            for (;;) {

                int current = getState();

                int newCount = current + returnCount;

                if (compareAndSetState(current, newCount)) {

                    return true;

                }

            }

        }

 

        final ConditionObject newCondition() {

            return new ConditionObject();

        }

    }

 

    @Override

    public void lock() {

        sync.acquireShared(1);

    }

 

    @Override

    public void unlock() {

        sync.releaseShared(1);

    }

 

    @Override

    public void lockInterruptibly() throws InterruptedException {

        sync.acquireSharedInterruptibly(1);

    }

 

    @Override

    public boolean tryLock() {

        return sync.tryAcquireShared(1) >= 0;

    }

 

    @Override

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));

    }

 

    @Override

    public Condition newCondition() {

        return sync.newCondition();

    }

 

    /**

     * 简单的测试线程,打印出获取到锁的线程名称,并sleep2秒

     */

    private static class TestMyLock implements Runnable {

        Lock lock;

 

        public TestMyLock(Lock selfLock) {

            lock = selfLock;

        }

 

        @Override

        public void run() {

            lock.lock();

            try {

                System.out.println(Thread.currentThread().getName() + " get the lock and will sleep 2 second");

                Thread.sleep(2000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            } finally {

                lock.unlock();

            }

        }

    }

 

    public static void main(String[] arg0) {

        Lock selfLock = new ShareLock();

        for (int i = 0; i < 6; ++i) {

            new Thread(new TestMyLock(selfLock), "Test share Lock Thread: " + i).start();

        }

    }

}

    代码位置:aqs-lock的part10

    运行结果:

    最好自己运行下,是先打出了前三个,等待两秒后在打出后3个

3. 源码时间

    AbstractQueuedSynchronizer的源码相对比较复杂,我这里尝试理解Doug Lea大师的设计思路。有一些相对底层CAS的调用我在前面的博客中有简单的说明,不在这里做赘述。

3.1. 参数

    需要注意的参数有以下几个:

    Node head:等待队列的头结点,后面会说他是在什么地方被设置的

    Node tail:等待队列的尾节点

    int state:我的理解是用于标识锁的状态

3.2. 内部类

3.2.1. Node

    等待队列使用的类,数据结构类似于一个双向链表,在获取不到锁的线程会被封装成这个node放入队列中。

    Node有以下属性:

属性名称

描述

int waitStatus

表示节点的状态。其中包含的状态有:

  1. CANCELLED,值为1,表示当前的线程被取消;
  2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
  5. 值为0,表示当前节点在sync队列中,等待着获取锁。

Node prev

节点元素的前驱节点

Node next

节点元素的后置节点

Node nextWaiter

存储condition队列中的后继节点

Thread thread

入队列时的当前线程。

3.2.2. ConditionObject

    实现了Condition接口,主要用于线程的等待和唤醒。

3.3. 主要方法

3.3.1. void acquire(int arg)

    acquire是进行独占式获取锁

    前面说过tryAcquire是模板方法,由子类自行实现

    若线程没有获取到锁,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    addwaiter中,首先将当前线程封装成一个node对象,若tail尾结点不为空,则说明等待队列也不为空,则将当前封装好的节点设置为尾结点,注意,这里设置尾结点的时候有可能会有并发问题,所以大神使用了CAS来玩。若尾结点不为空或设置尾结点失败,则进入enq()方法。

    这里大师使用自旋,首先判断尾节点为空,则说明等待队列为空,则设置当前封装好的节点为头节点,并设置尾节点也是当前封装好的节点。

    若尾结点不为空,则使用CAS设置当前封装好的节点为尾节点,并将之前的尾节点的后置节点指向当前封装好的节点。

    再看acquireQueued方法。

    看到这里也是使用了自旋,node.predecessor()是获取传入节点的前置节点,若传入节点的前置节点为头节点,且再次尝试获取锁成功,则将传入节点设置为头结点,并将后置节点置空。

    若当前节点的前置节点不是头结点或者获取锁失败了,则调用shouldParkAfterFailedAcquire(Node pred, Node node)方法进行信号量判断并设置等待信号量。parkAndCheckInterrupt()则是调用LockSupport.park让当前线程进行等待。

3.3.2. release(int arg)

    独占式释放锁

    release方法相对简单许多tryRelease是取决于子类的实现,若头结点不为空且状态不为0,则进入unparkSuccessor方法

    注意,这个方法传入node是头节点,若头结点的状态负责要求,则进行将头结点状态设置为等待获取锁(即为0)。若头结点的后置节点为空或状态为线程被取消,则进入下一层判断,尾结点不为空,尾结点不是头节点,尾结点的前置节点为各种待唤醒状态,则调用LockSupport的unpark方法进行唤醒等待,若以上判断不成立,则是将头节点的后置节点唤醒等待。(有点绕,而且我表达的也很复杂,还是自己看源码自己理解的比较好)。并且由此看到每次的唤醒都是由头节点开始,算是一个FIFO的公平等待队列。

    AQS的主要方法我大致都看过,尤其以以上两个方法最为复杂,其他的主要方法如共享获取锁,共享释放等大家自行理解也能看懂,不在这里做赘述,毕竟我们看源码不是为了记住每一行代码,而是为了更好的了解大师的设计思想和考虑问题解决问题的方式(我个人看源码也是为了装逼)。

猜你喜欢

转载自blog.csdn.net/u011294519/article/details/88938013