1、简介
AbstractQueuedSynchronizer是java 同步器的基石,是为了更快,更省心的自定义实现同步锁的模板;这次也是介绍CountDownLatch、Semaphore,它们也实现了AbstractQueuedSynchronizer,我觉得是一种逻辑的锁,或者一种线程间条件执行同步的措施;CyclicBarrier 也是对锁的一种应用,由利于我们更好的对锁的理解和使用,构建适合自己应用的锁机制
2、Semaphore
简单来说,其是同时允许有限个线程资源,超过后则需等待唤醒;
2.1使用方式
获取资源方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
释放资源方法
public void release() {
sync.releaseShared(1);
}
复制代码
2.2 源码分析:
初始化时设置state的值
Sync(int permits) {
setState(permits);
}
复制代码
释放锁,此时只是验证设置后的值是否超过限定值,如果不超过,则进行设置
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
复制代码
非公平模式获取锁:检测当前状态值如果大于等于0,则获取锁并重置state,否则进行排队(为何排队挂起,见AbstractQueuedSynchronizer)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
公平模式获取锁:公平就是按照先来后到,如果队列中有数据,则进行排队,否则检测当前状态值如果大于等于0,则获取锁并重置state,小于0则排队等待
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
2.3 源码总结
利用了AQS的state,用state 值来做为阈值来控制是否可以获取锁
3 CountDownLatch
简单来说,就是当满足条件(state == 0)时,等待队列的线程或者尝试获取锁的线程 均可执行,也就是用state == 0作为阈值,来控制资源是否开放
3.1 使用方法
使用方法并不是成对使用,需要获取锁的线程,去尝试获取锁就行了,而其能执行满足的条件,在需要的逻辑进行处理;等条件满足了,锁自然就打开了
需要条件才可进行的线程,调用此方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
复制代码
其它逻辑中对state进行减1操作,
public void countDown() {
sync.releaseShared(1);
}
复制代码
3.2 同步器实现
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
复制代码
- 通过构造器,传入state初始值
- state == 0则有执行权限
- 如果state - 1 后等于0,则需要释放,否则,不进行任何处理
3.3 原理总结
- 初始state必须大于0,否则等待线程永远不会执行
- state == 0时线程才可以执行
- 资源能够执行后,再也不会上锁(state == 0 后,不会再改变)
- 排队中的线程节点,再state 重置为0时,全部被唤醒
4、CyclicBarrier
其利用条件锁特性,如果当前尝试获取条件锁和锁等待队列中的个数满足要求,则可以执行一个额外的任务,并且原有等待线程很快被释放;若额外任务为空,或许线程执行与正常锁ReentrantLock逻辑一致,否则重复过程;可以使用reset方法终止循环中的某个过程,而重新开始一个新循环
4.1 使用
初始化
等待线程个数,和满足个数之后执行的额外任务
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
复制代码
等待或者额外任务执行逻辑
调用次方进行等待排队
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
复制代码
4.2源码分析
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
复制代码
- 方法内部利用ReentrantLock进行排队
- 线程排队个数如果达到要求,也就是count--,count--达到count为0,这时执行额外任务
- 执行额外任务时,如果额外任务为空,则breakBarrier方法,也就是去除当前限制;否则nextGeneration方法,下一波线程执行重复此过程
- 未达到排队个数要求时,先挂起,等待唤醒(breakBarrier, nextGeneration, reset方法进行唤醒)
- 达到排队个数,则直接返回
- 上述几个点,只是没有超时限制的等待;若有超时限制的等待,如果超过时间,线程也会从排队中出来,继续进行
reset方法:先执行打断当前的过程,然后进行重复
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
复制代码
4.3原理总结
- 多个线程组队,则可以执行额外任务
- 额外任务为空,则只有一个循环
- 不为空,多个任务一个额外任务模式一直循环
技术变化都很快,但基础技术、理论知识永远都是那些;作者希望在余后的生活中,对常用技术点进行基础知识分享;如果你觉得文章写的不错,请给与关注和点赞;如果文章存在错误,也请多多指教!