目录页:https://blog.csdn.net/u011294519/article/details/88367808
1.大声哔哔
前面在学习并发工具类的时候总有一个绕不开的类AbstractQueuedSynchronizer,怀着崇敬的心情,我决定开始爬这座山峰,首先看到的是@author Doug Lea,向大师致敬。
说到AbstractQueuedSynchronizer(AQS),首先需要知道的是AQS中大量运用了模板方法这种设计模式。简单说下模板方法这种设计模式,就是在父类的一个方法中定义了执行的步骤,但是每个步骤具体的执行内容和实现交由子类完成,调用的时候直接调用父类定义好的方法,按照步骤执行子类的实现内容就行。
AQS中用到模板方法设计模式的方法如下:
独占式获取锁 |
acquire acquireInterruptibly tryAcquireNanos |
共享式获取锁 |
acquireShared acquireSharedInterruptibly tryAcquireSharedNanos |
独占式释放锁 |
release |
共享式释放锁 |
releaseShared |
需要子类覆盖的流程方法
需要子类覆盖的流程方法
1.独占式获取 tryAcquire
2.独占式释放 tryRelease
3.共享式获取 tryAcquireShared
4.共享式释放 tryReleaseShared
5.这个同步器是否处于独占模式 isHeldExclusively
2.上代码
2.1. 非共享锁
下面使用AbstractQueuedSynchronizer实现一个自己的锁。
package com.concurrent.aqslock.part10.aqs;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
/** * 类说明:实现一个自己的锁 */ public class SelfLock implements Lock {
/** * state 表示获取到锁 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到 */ private static class Sync extends AbstractQueuedSynchronizer {
/** * 当前锁是否被占用 */ @Override protected boolean isHeldExclusively() { return getState() == 1; }
/** * 独占式获取锁 */ @Override protected boolean tryAcquire(int arg) { // 使用CAS设置state设置当前拥有锁的线程 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int arg) { // 如果state已经为0则不需释放 if (getState() == 0) { throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); // 这里不使用CAS的原因是这个是独占锁,释放锁的时候不会出现并发问题 setState(0); return true; }
Condition newCondition() { return new ConditionObject(); } }
private final Sync sycn = new Sync();
@Override public void lock() { sycn.acquire(1);
}
@Override public void lockInterruptibly() throws InterruptedException { sycn.acquireInterruptibly(1); }
@Override public boolean tryLock() { return sycn.tryAcquire(1); }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sycn.tryAcquireNanos(1, unit.toNanos(time)); }
@Override public void unlock() { sycn.release(1);
}
@Override public Condition newCondition() { return sycn.newCondition(); }
/** * 简单的测试线程,打印出获取到锁的线程名称,并sleep2秒 */ private static class TestMyLock implements Runnable { Lock lock;
public TestMyLock(Lock selfLock) { lock = selfLock; }
@Override public void run() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " get the lock and will sleep 2 second"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public static void main(String[] arg0) { Lock selfLock = new SelfLock(); for (int i = 0; i < 3; ++i) { new Thread(new TestMyLock(selfLock), "Test self Lock Thread: " + i).start(); }
Lock reentrantLock = new ReentrantLock(); for (int i = 0; i < 3; ++i) { new Thread(new TestMyLock(reentrantLock), "ReentrantLock Thread: " + i).start(); } }
} |
代码位置:aqs-lock的part10
运行结果:
2.2. 共享锁
package com.concurrent.aqslock.part10.aqs;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;
/** *类说明:共享同步工具类 */ public class ShareLock implements Lock {
//为3表示允许三个线程同时获得锁 private final Sync sync = new Sync(3);
private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); }
@Override public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
@Override public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } }
final ConditionObject newCondition() { return new ConditionObject(); } }
@Override public void lock() { sync.acquireShared(1); }
@Override public void unlock() { sync.releaseShared(1); }
@Override public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
@Override public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); }
@Override public Condition newCondition() { return sync.newCondition(); }
/** * 简单的测试线程,打印出获取到锁的线程名称,并sleep2秒 */ private static class TestMyLock implements Runnable { Lock lock;
public TestMyLock(Lock selfLock) { lock = selfLock; }
@Override public void run() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " get the lock and will sleep 2 second"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
public static void main(String[] arg0) { Lock selfLock = new ShareLock(); for (int i = 0; i < 6; ++i) { new Thread(new TestMyLock(selfLock), "Test share Lock Thread: " + i).start(); } } } |
代码位置:aqs-lock的part10
运行结果:
最好自己运行下,是先打出了前三个,等待两秒后在打出后3个
3. 源码时间
AbstractQueuedSynchronizer的源码相对比较复杂,我这里尝试理解Doug Lea大师的设计思路。有一些相对底层CAS的调用我在前面的博客中有简单的说明,不在这里做赘述。
3.1. 参数
需要注意的参数有以下几个:
Node head:等待队列的头结点,后面会说他是在什么地方被设置的
Node tail:等待队列的尾节点
int state:我的理解是用于标识锁的状态
3.2. 内部类
3.2.1. Node
等待队列使用的类,数据结构类似于一个双向链表,在获取不到锁的线程会被封装成这个node放入队列中。
Node有以下属性:
属性名称 |
描述 |
int waitStatus |
表示节点的状态。其中包含的状态有:
|
Node prev |
节点元素的前驱节点 |
Node next |
节点元素的后置节点 |
Node nextWaiter |
存储condition队列中的后继节点 |
Thread thread |
入队列时的当前线程。 |
3.2.2. ConditionObject
实现了Condition接口,主要用于线程的等待和唤醒。
3.3. 主要方法
3.3.1. void acquire(int arg)
acquire是进行独占式获取锁
前面说过tryAcquire是模板方法,由子类自行实现
若线程没有获取到锁,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addwaiter中,首先将当前线程封装成一个node对象,若tail尾结点不为空,则说明等待队列也不为空,则将当前封装好的节点设置为尾结点,注意,这里设置尾结点的时候有可能会有并发问题,所以大神使用了CAS来玩。若尾结点不为空或设置尾结点失败,则进入enq()方法。
这里大师使用自旋,首先判断尾节点为空,则说明等待队列为空,则设置当前封装好的节点为头节点,并设置尾节点也是当前封装好的节点。
若尾结点不为空,则使用CAS设置当前封装好的节点为尾节点,并将之前的尾节点的后置节点指向当前封装好的节点。
再看acquireQueued方法。
看到这里也是使用了自旋,node.predecessor()是获取传入节点的前置节点,若传入节点的前置节点为头节点,且再次尝试获取锁成功,则将传入节点设置为头结点,并将后置节点置空。
若当前节点的前置节点不是头结点或者获取锁失败了,则调用shouldParkAfterFailedAcquire(Node pred, Node node)方法进行信号量判断并设置等待信号量。parkAndCheckInterrupt()则是调用LockSupport.park让当前线程进行等待。
3.3.2. release(int arg)
独占式释放锁
release方法相对简单许多tryRelease是取决于子类的实现,若头结点不为空且状态不为0,则进入unparkSuccessor方法
注意,这个方法传入node是头节点,若头结点的状态负责要求,则进行将头结点状态设置为等待获取锁(即为0)。若头结点的后置节点为空或状态为线程被取消,则进入下一层判断,尾结点不为空,尾结点不是头节点,尾结点的前置节点为各种待唤醒状态,则调用LockSupport的unpark方法进行唤醒等待,若以上判断不成立,则是将头节点的后置节点唤醒等待。(有点绕,而且我表达的也很复杂,还是自己看源码自己理解的比较好)。并且由此看到每次的唤醒都是由头节点开始,算是一个FIFO的公平等待队列。
AQS的主要方法我大致都看过,尤其以以上两个方法最为复杂,其他的主要方法如共享获取锁,共享释放等大家自行理解也能看懂,不在这里做赘述,毕竟我们看源码不是为了记住每一行代码,而是为了更好的了解大师的设计思想和考虑问题解决问题的方式(我个人看源码也是为了装逼)。