本文章主要说明队列同步器AbstractQueuSynchronized的使用,以及对其主要方法的简单分析。
队列同步器(以下简称AQS)
队列同步器,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示同步状态,通过内置的FIFO队列来完成状态抢占线程的排队工作。
同步状态的表示和操作
很明显,表示同步状态的这个量为int类型,且该变量是volatile修饰的,保证可见性!
那既然涉及到操作内部的int成员变量,那一定有相应的操作方法:我们来看看源码:
使用CAS设置当前状态,该方法能够保证状态设置的原子性
compareAndSetState(expect, update)
看到这里,你应该知道,同步队列内部维护着一个int变量,且有几个方法可以操作这个变量。
后续你会理解,这个int变量,所起到的作用,也就是所谓的状态。
AQS的具体使用
关于其使用,我们先来思考这样一个问题,如果我们现在要实现一个独占锁,也就是自定义一个类,该类具有独占锁的功能。所谓的独占锁,就是同一时刻,只能有最多一个线程获取到该锁。我们的基本思路是,该自定义类维护一个标记变量,如果有线程获取到这个标记,我们就改变标记位为不可获取状态,保证其他线程无法再获取,当该线程使用完锁后,释放锁,也就是把标记位再改为可获取状态。
对的,你没有想错,AQS就是干这个事情的。他已经实现了上述的逻辑,所以,我们只需要在我们自定义同步器中使用到AQS就可以了,只不过他其中有几个方法需要我们重写,来满足我们想要具体实现的逻辑。这很好理解,如果我们自定义同步器,实现的是独占锁,我们使用AQS,然后重写其中要用的方法,让其实现逻辑为独占锁就可以了。
说了这么多,相信你也没看懂,哈哈!我们上代码来消化这些理论文字!
上述代码中出现了这么几个角色:
1.LockTest:正如注释中所说的那样,我们的自定义类,实现的功能为一个独占锁的功能(这代码并不全,LockTest类还有很多方法,是实现自Lock接口的,下面会分析)。该类,实现了Lock接口。
2.MyAQS:自定义同步队列器,继承自AQS。是LockTest我们自定义类的静态内部类。这样定义的好处是,私有封装。
然后看代码所示的两个方法,就是我们MyAQS继承自AQS所重写的两个方法,其中tryAcquire()方法,就是尝试获取一下AQS内部维护的状态,而tryRelease()方法,则就代表的是释放状态,也就是释放锁。这时候,我们的自定义同步器其实就做好了。
再来看LockTest剩余的代码:
好了,我们可以在我们的自定义类LockTest中使用自定义逻辑的同步器了,正如第一行代码,内部维护这一个myAQS。好了,下面就是自定义类实现接口Lock的具体方法了,先来看一下图中的lock()方法,他调用了myAQS.acquire()方法,很显然,我们自定义AQs中并没有该方法,那显然是来自继承的AQS了,我们看jdk源码:
哎,我们发现,acquire中竟然使用到了我们重写的tryAcquire()方法,仔细想一下,立刻就明白了,也就是,我们自定义类的lock方法,其实是调用继承自底层AQS的请求方法,然后AQS再去调用我们自定义AQS中重写的tryAcquire().再回到代码里,发现如果获取状态失败,怎么办呢?也就是tryAcquire返回false,看图中if中的判断条件是!tryAcquire,也就是返回false的时候,if中的第一个判断逻辑其实是true,因为是&&,继续判断第二个逻辑,也就是第三行,而这一步,就是把请求线程放到了AQS所维护的请求等待队列中了。
那么整个使用AQS的逻辑就出来了,如下总结:
现在我们要做一个自定义类实现同步组件的功能,涉及四个角色,以上述例子中的名字做分析:
四个角色:
1.我们自定义的实现类,LockTest,其实现Lock接口,实现lock中基本的锁方法
2.Lock接口
3.我们自定义的队列同步器,作为LockTest的静态内部类,继承AQS,实现具体的锁逻辑
4.AQS,被自定义同步器继承
那么,细心的你,在理解了上述讨论后,会立马问,什么方法需要重写,什么方法不重写呢?
我们分别以独占锁和共享锁为例进行讨论:
1.独占锁,如果我们自定义同步器实现的是独占锁的逻辑,我们在MyAQS中重写tryAcquire()方法和tryRelease()方法即可,而自定义类,LockTest中的lock()方法中直接调用acquire()方法,unlock()方法中直接调用release()方法。
2.共享锁,如果我们自定义同步器实现的是共享锁的逻辑,我们在MyAQS中重写tryAcquireShared()方法和tryReleaseShared()方法即可,而自定义类LockTest中的lock()方法中直接调用acquireShared()方法,unlock()方法中直接调用releaseShared()方法。
其实这很好理解:我们自定义类实现Lock接口,本质上就是实现lock()方法和unlock()方法,而AQS的acquire(),acquireShared()已经为我们提供了实现lock()的逻辑模板,而所谓的逻辑模板就做两件事,一是去执行获取锁(管理同步状态)的逻辑,二是去执行获取不到锁的情况下维护一个请求队列,分析源码,我们已经知道了,第二件事acquire(),acquireShared()这两个方法已经为我们实现了,这也是我们用AQS的意义,所以我们要自己去做第一件事,也就是去实现tryAcquire(),tryAcquireShared()这两个方法,所以我们要重写这两个方法,达到我们所要实现的请求获取锁时候的逻辑。
看到这里,不得不提出的是,使用AQS其实是基于模板方法的设计模式,也就是AQS内部可以分成两种方法,一种是类似于模板方法,我们可以直接使用,但是必须实现模板方法中需要重写的方法。第二种就是可重写方法,也就是我们自定义组件,实现的自定义逻辑的方法。
例如:tryAcquire(),tryRelease(),tryAcquireShared(),tryReleaseShared()都是可重写方法
而,acquire(),release(),acquireShared(),releaseShared()都是已经实现的模板方法,去调用可重写方法
此时我们会发现整个AQS的作用:
AQS是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的具体逻辑。可以这样理解二者之间的关系:锁是面向使用者的,他定义了使用者与锁交互的接口,隐藏了实现细节。而同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待与唤醒等底层的操作。所以,锁和同步器很好的隔离了使用者和实现者所需要关注的领域。
附上练习代码:
两个代码,一个是独占锁的实现,一个是共享锁的实现。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class AQSTest {
//我们来测试一下我们自定义的同步组件LockTest
public static void main(String args[])
{
//我们写两个线程来抢占该锁
ThreadTest tt = new ThreadTest();
Thread thread1 = new Thread(tt);
Thread thread2 = new Thread(tt);
thread1.start();
thread2.start();
try {
//Join方法,保证两个子线程运行结束后,继续执行主线程
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("多线程处理结果:" + tt.num);
}
}
//我们封装一个线程类
class ThreadTest implements Runnable
{
private Lock lock = new LockTest();
public int num = 0;
public void run() {
lock.lock(); //获取锁
try {
for(int i = 0; i < 100000; i++)
num++;
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();//finally中保证锁一定关闭
}
}
}
//自定义同步组件,也就是自己实现一个独占锁功能的类
class LockTest implements Lock
{
//静态内部类,自定义同步器
private static class MyAQS extends AbstractQueuedSynchronizer
{
//继承AQS,就要重写其可重写方法,实现自己同步器的逻辑,然后由模板方法调用
//这里强调一下AQS的原理,底层维护这一个标记状态,int类型,初始为0
//然后AQS本身有三个关于这个状态的操作方法
//getState().setState().compareAndSetState()
//其中第三个方法保证了原子性。
//重写方法如下:
//判断是否处于占用状态,也就是AQS内部的状态标记是否被某个线程占有
//当状态为0的时候,获取锁
protected boolean tryAcquire(int acquire)
{
if(compareAndSetState(0, 1))
{
//设置该状态被哪个线程抢占了
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,并把状态设置为0
protected boolean tryRelease(int release)
{
if(getState() == 0) //如果状态为0,无法释放
throw new RuntimeException();
//把状态占有者设置为空
setExclusiveOwnerThread(null);
setState(0); //释放状态
return true;
}
}
//下面的方法,就是实现lock接口所需要实现的方法
//这时候就是同步队列器发挥作用的时候了,我们只需要调用其方法就可以了
private final MyAQS myAQS = new MyAQS();
public void lock() {
//调用从AQS继承的模板方法acquire
//而acquire底层去调用上面重写的tryAcquire再加上其他的逻辑
myAQS.acquire(1);
}
public boolean tryLock() {
//注意这里直接调用了我们重写的方法,因为tryLock()只会尝试一次获取锁
return myAQS.tryAcquire(1);
}
public void unlock() {
myAQS.release(1);
}
public void lockInterruptibly() throws InterruptedException {
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public Condition newCondition() {
return null;
}
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class AQSTest2 {
//我们来测试一下我们自定义的锁
public static void main(String args[])
{
ThreadTest2 tt = new ThreadTest2();
Thread thread1 = new Thread(tt);
Thread thread2 = new Thread(tt);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("两个线程的程序运行结果为:"+tt.number);
}
}
//我们封装一个线程类
class ThreadTest2 implements Runnable
{
private Lock lock = new LockTest2();
public int number = 0;
public void run()
{
lock.lock();
try {
for(int i = 0; i < 10000; i++)
number++;
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
//实现自定义锁,实现共享锁的逻辑
class LockTest2 implements Lock
{
private class MyAQS2 extends AbstractQueuedSynchronizer
{
//构造方法,设置默认状态数
public MyAQS2(int number)
{
if(number < 0)
throw new RuntimeException();
this.setState(number);
}
//重写AQS可重写方法
//共享式状态的获取
public int tryAcquireShared(int reduceCount)
{
//死循环,配合下面if中CAS
for(;;)
{
int current = getState();
int newCount = current - reduceCount;
if(newCount < 0 || compareAndSetState(current, newCount))
{
return newCount;
}
}
}
//共享式状态的释放
public boolean tryReleaseShared(int addCount)
{
for(;;)
{
int current = getState();
int newCount = current + addCount;
if(newCount < 0 || compareAndSetState(current, newCount))
{
return true;
}
}
}
}
private MyAQS2 myAQS2 = new MyAQS2(2); //设置状态数为2,表示最多可以两个线程共享
@Override
public void lock() {
myAQS2.acquireShared(1);
}
@Override
public void unlock() {
myAQS2.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}