简述
AQS全称AbstractQueueSynchronizer,在java.util.concurrent.locks包下,是构建锁和其他同步器的框架,它的定位是能够成为实现大部分同步需求的基础。Java中基于AQS的有ReentrantLock、Semaphore、读写锁、CountDownLatch等。
同步器内部维护一个volatile的int类型的成员变量表示同步状态,主要使用方式是通过子类继承同步器,实现它的抽象方法(try开头的方法)来管理同步状态。在同步器的内部提供了一个FIFO双向队列完成资源获取线程的排队工作,当线程获取同步状态失败时,同步器会将当前线程以及等待线程等信息构造一个Node,放入同步队列的尾部,同时阻塞当前线程。当同步状态释放时,会把首节点中的线程唤醒,再去尝试获取同步状态。 它使用模板方法设计模式,定义了getState、setState和compareAndSetState方法保证状态的改变是安全的,并且提供了线程入队和出队的队列,我们只需要实现try开头的方法即可。
AQS提供了独占和共享两种资源获取方式。
独占是只有一个线程能执行,又根据是否按队列的顺序分公平锁和非公平锁,比如ReentrantLock。
共享是多个线程能同时执行,例如Semaphore、CountDownLatch等。
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
自定义抽象Lock
public interface Lock {
void lock();
boolean tryLock();
void unlock();
}
自旋锁(不可重入)
import java.util.concurrent.atomic.AtomicBoolean;
public class SimpleMutexLock implements Lock {
private AtomicBoolean lock = new AtomicBoolean(false);
@Override
public void lock() {
for (; ; ) {
if (lock.compareAndSet(false, true)) {
return;
}
}
}
@Override
public boolean tryLock() {
return lock.compareAndSet(false, true);
}
@Override
public void unlock() {
lock.compareAndSet(true, false);
}
}
AQS独占锁
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MutexLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively() {
return getState() == 1;
}
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
}
AQS共享锁
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class SharedLock implements Lock {
public SharedLock(int count) {
sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
protected Sync(int count) {
if (count < 1) {
throw new IllegalArgumentException("count must > 0");
}
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for (; ; ) {
int count = getState();
if (count == 0) {
return 0;
}
int c = count - 1;
if (compareAndSetState(count, c)) {
return c;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int count = getState();
int c = count + 1;
if (compareAndSetState(count, c)) {
return true;
}
}
}
}
private Sync sync;
public void lock() {
sync.acquireShared(1);
}
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
public void unlock() {
sync.tryReleaseShared(1);
}
}
测试类
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Yangzhen
* @Description 1.单元测试类继承此类;
* 2.重写concurrentCode方法,定义自己的并发代码;
* 3.重写encapsulatingData方法,并在concurrentCode方法中调用,定义数据的拼装;
* 4.重写blockingMainThread方法,定义主线程阻塞策略
* @date 2018-12-26 10:06
**/
public abstract class AbstractConcurrentControl {
public AtomicLong longCounter = new AtomicLong(0);
public AtomicInteger intCounter = new AtomicInteger(0);
/**
* 默认并发线程数为2000个,子类可重写
*/
private static final int DEFAULT_CONCURRENT_CONTROL = 2000;
private CountDownLatch blockLatch;
private CountDownLatch completeLatch;
/**
* 并发线程数量,默认2000
*/
private int concurrentThreadNum;
private ThreadPoolExecutor threadPool;
public AbstractConcurrentControl() {
this(DEFAULT_CONCURRENT_CONTROL);
}
public AbstractConcurrentControl(int concurrentThreadNum) {
this.concurrentThreadNum = concurrentThreadNum;
blockLatch = new CountDownLatch(concurrentThreadNum);
completeLatch = new CountDownLatch(concurrentThreadNum);
threadPool = new ThreadPoolExecutor(concurrentThreadNum, concurrentThreadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
/**
* 并发执行线程
*
* @Title: process
* @date 2018年12月26日 上午11:19:20
* @author yz
*/
public final void process() {
for (int i = 0; i < concurrentThreadNum; i++) {
if (i == 0) {
concurrentCode();
} else if (i == 1) {
useThread(5, TimeUnit.SECONDS);
} else if (i == 2) {
useThread(10, TimeUnit.SECONDS);
} else {
useThread(15, TimeUnit.SECONDS);
}
// 最后一个线程时可以打个断点
if (i == concurrentThreadNum - 1) {
blockLatch.countDown();
} else {
blockLatch.countDown();
}
}
blockingMainThread();
}
private void useThread() {
useThread(0, null);
}
private void useThread(final int sleepTime, final TimeUnit timeUnit) {
threadPool.submit(new Runnable() {
@Override
public void run() {
try {
blockLatch.await();
if (sleepTime != 0 && timeUnit != null) {
timeUnit.sleep(sleepTime);
}
concurrentCode();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
completeLatch.countDown();
}
}
});
}
/**
* 并发代码
*
* @Title: concurrentCode
* @date 2018年12月26日 下午2:05:25
* @author yz
*/
protected abstract void concurrentCode();
/**
* 并发数据
*
* @return
* @Title: encapsulatingData
* @date 2018年12月26日 下午2:06:14
* @author yz
*/
protected <T> T encapsulatingData() {
return null;
}
/**
* 阻塞主线程,防止JVM关闭,不建议使用Xxx.class.wait,可以使用TimeUnit.SECONDS.sleep(200);
* 如果使用TimeUnit.SECONDS.sleep(200),要保证休眠时长足够跑完你的程序,否则会出现异常,因为JVM已经关闭,而测试的线程可能没有执行完成
*
* @Title: blockingMainThread
* @date 2018年12月26日 下午6:55:03
* @author yz
*/
protected void blockingMainThread() {
if (this.threadPool == null) {
return;
}
try {
this.completeLatch.await();
// 这里是为了再次确保等所有任务完成后再关闭线程池;没有使用线程池的方法判断是因为测试过程中真的发现了缺少最后一次入库的记录
while (true) {
if (this.blockLatch.getCount() == 0 && this.completeLatch.getCount() == 0) {
break;
}
}
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("关闭线程池");
this.threadPool.shutdown();
}
}
}
package com.clife.iot.api.web.flow.p;
import java.util.concurrent.TimeUnit;
public class Test extends AbstractConcurrentControl {
private Lock lock;
private static int count = 0;
public Test(Lock lock) {
super(4);
this.lock = lock;
}
@Override
protected void concurrentCode() {
testLock();
}
public void testLock() {
lock.lock();
try {
count++;
System.out.println("current thread name: " + Thread.currentThread().getName() + " get lock" + ", count: " + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("current thread name: " + Thread.currentThread().getName() + " release lock");
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}
}
public void testTryLock() {
if (lock.tryLock()) {
try {
count++;
System.out.println("current thread name: " + Thread.currentThread().getName() + " get lock" + ", count: " + count);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("current thread name: " + Thread.currentThread().getName() + " release lock");
lock.unlock();
}
} else {
System.out.println("current thread name: " + Thread.currentThread().getName() + " not get lock");
}
}
@Override
protected <T> T encapsulatingData() {
return null;
}
public static void main(String[] args) {
Test test = new Test(new MutexLock());
test.process();
}
}