写在前面
Semaphore源码学习AQS
demo及源码学习
Semaphore 信号量,就是限流,每次只能允许获取到信号量的线程执行
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":aquire 时间:"+System.currentTimeMillis());
Thread.sleep(1000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程" + i).start();
}
}
结果
线程1:aquire 时间:1612068740726
线程2:aquire 时间:1612068740726
线程3:aquire 时间:1612068741727
线程4:aquire 时间:1612068741727
线程5:aquire 时间:1612068742728
虽然我们开启了5个线程,但是每次只会有2个线程执行,因为信号量只有两个,只有获取到信号量的线程才会执行,当前两个线程获取到所有信号量后,信号量就为0,再有线程进来会阻塞住,等待其他线程释放信号量。
根据源码一点点分析下
semaphore.acquire();
获取信号量方法
public Semaphore(int permits) {
//信号量构造方法 默认创建非公平锁
sync = new NonfairSync(permits);
}
sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//线程是否中断 如果线程中断过抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁,此处aqs与前面有些不同,前面是独占锁
//这里记住是共享锁,因为会有多个线程来修改state值
if (tryAcquireShared(arg) < 0)
//获取共享锁,获取信号量
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
我们先开看看非公平锁实现,最终调用以下方法
final int nonfairTryAcquireShared(int acquires) {
//死循环
for (;;) {
//获取信号量的状态值,按照demo例子,此处为2
int available = getState();
//acquires 为 1 , 可用的状态值 - 1
int remaining = available - acquires;
//如果值小于0,返回小于0的值
//如果大于0,设置状态值成功了,返回状态值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 循环加cas,还是那个套路,就是用来在多线程下此段方法能正确执行
- 获取状态值,起始值为2,如果此时有4个线程(ABCD)一起进入此方法,remaining取值都是1,但执行compareAndSetState只会有一个线程执行成功,假设此A线程cas成功,返回这个1
- 其余线程继续循环,此时有3个线程(BCD)执行第二次,这时remaining都等于0,都是 1 - 1,remaining不小于0,此时继续执行compareAndSetState方法,此时只会有一个线程执行成功,假设此B线程cas成功,将state值由1设置为0,返回0
- 其余两个线程(CD)进入第3此循环,但是此时state为0,当减完1后值为 -1,直接返回 -1,两个线程都返回 -1
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
- 看上面这段代码,按照上面的例子,前面两个线程一个返回1,一个返回0,不走到if里面,直接返回,执行业务代码
- CD线程都返回-1,进入else方法
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程加入到CLH队列,加入到队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取节点的前置节点是否是头节点,之后head节点的下一个节点才有机会获取到锁,其余节点都需要进行排队等待
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//获取当前节点的前置节点,将其状态设置为-1,因为前置节点为-1时才会唤醒后续节点
//阻塞住当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 主要逻辑已经在代码中进行了注释
- 跟重入锁的逻辑很像对吧,不同之处在与此处时尝试获取共享锁
- 代码就不细讲了,大致跟前面讲的重入锁一致,不一致的地方为共享锁,我们看看共享锁的子类实现
- 大致逻辑为,按照上面的例子,此时CD线程进入到此方法,先将CD线程加入到队列中,设置属性为共享
- CD线程一起进来,但是队列还是有一个顺序,假如
现在队列中情况为 head - > C -> D - 现在CD一起进入死循环,先判断CD节点的前置节点是否为head节点,很显然C是,D不是,D直接走如下面的方法进入线程阻塞住,等待被唤醒
- C线程进入if,先尝试获取下信号量,看看有没有线程释放掉信号了,如果有线程释放了信号量,直接获取信号量返回,走业务代码
- 此时还是会有公平与非公平的,如果C执行尝试获取锁代码(tryAcquireShared),此时有一个线程E刚刚进来,刚好获取到信号量不为0,此时就会有E线程与队列的C线程竞争这个信号量,谁竞争成功谁获取到信号量执行,如果是公平,E不会跟C抢占信号量,E乖乖的去队尾吧。就是这样 head -> C -> D -> E
我们主要看非公平锁,就是我们前面讲到获取共享锁代码
semaphore.release()
释放信号量方法
public void release() {
sync.releaseShared(1);
}
//释放信号量
public final boolean releaseShared(int arg) {
//尝试释放信号量
if (tryReleaseShared(arg)) {
//执行唤醒
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
- 此时按照上面的例子进行讲解,AB线程尝试执行释放锁
- 死循环加aqs还是多线程情况能正确执行代码
- AB同时进来,获取状态0,都进行加1操作,然后利用cas将state置为1,cas只有一个会成功,成功后返回true
- 另一个线程执行下一此循环,将state置为2
如果state大于0,就是有信号量的情况下,执行下面这段代码
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
//说明队列不为空
if (h != null && h != tail) {
//获取状态
int ws = h.waitStatus;
//状态等于-1说明需要唤醒后续节点
if (ws == Node.SIGNAL) {
//将当前状态置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒head的下一个节点,同重入锁代码
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果head变了,说明已经有新的线程获取到信号量了,继续循环
if (h == head) // loop if head changed
break;
}
}
- 此段代码的逻辑就是将队列的线程唤醒
- 按照例子,A线程释放了信号量,执行release方法,然后通过上面的方法将队列中的线程唤醒
- 如果head没有变化说明队列中线程还比当前方法执行晚,如果head变化了说明队列中已经有线程获取到信号量了
唤醒的代码
- 什么时候结束循环呢,当A线程释放信号量唤醒队列中的C线程,C线程阻塞被唤醒,执行下一次循环,因为C线程就是head的下一个节点,尝试获取锁成功,此时C线程获取到信号量,head改变。
- head改变后继续循环
- 继续唤醒D,D线程阻塞被唤醒,执行下一次循环,因为D线程就是head的下一个节点,但是尝试获取锁失败,D线程继续阻塞,head不变。
- head没有改变,下面的循环结束
以上就是Semaphore的大致源码分析,如有纰漏,欢迎批评指正。