一.
Semaphore又称信号量,是操作系统中的一个概念,在Java并发编程中,信号量控制的是线程并发的数量。
Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。
1.简单理解
Reentrantlock是锁住只有一个线程可以操作,而Semaphore可以设置有多少线程可以同时并发
Reentrantlock通过state状态是否0,以及阻塞队列队头不断获取锁,来保持始终一个线程获取锁执行,获取锁的动作也是CAS并且标记下是哪个线程获得了锁,更改下state;而Semaphore 通过state(可获得的信号量),以及acquire需要的信号量进行计算,是否<0去决定能不能获得信号量,去执行compareAndSetState 就表示获得了信号量可以继续走了
2.阻塞队列和挂起线程
需要获取的信号量-可允许的信号量<0,线程就会被放入阻塞队列addWaiter(Node.SHARED),并且被挂起parkAndCheckInterrupt
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
二.使用方法
转载:https://blog.csdn.net/sinat_36246371/article/details/53872412
三.源码解读
转载:https://blog.csdn.net/sinat_34976604/article/details/80970977
Semaphore信号量是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
举个不适合吃饭时候看的例子:比如现在有五个茅坑,有50个人要蹲坑,那么最先抢到的五个人先蹲,剩下45个人排对等着,等有人先蹲完出来了,告诉排在第一个的人
开始前建议先去看看我之前AQS的文章,以及很多共享锁的代码在之前CountDownLatch文章里介绍过了。
看看例子:
public class SemaphoreTrain {
static class Worker extends Thread {
private int n;
private Semaphore semaphore;
public Worker(int n, Semaphore semaphore) {
this.n = n;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Worker num " + n + " use machine");
Thread.sleep(2000);
System.out.println("Worker num " + n + " stop use");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int worker = 6; //工人数
int machine = 4; //机器数
Semaphore semaphore = new Semaphore(machine);
for (int i = 0; i < worker; i++) {
new Worker(i, semaphore).start();
}
}
}
Worker num 1 use machine
Worker num 3 use machine
Worker num 2 use machine
Worker num 0 use machine
Worker num 0 stop use
Worker num 1 stop use
Worker num 3 stop use
Worker num 2 stop use
Worker num 4 use machine
Worker num 5 use machine
Worker num 4 stop use
Worker num 5 stop use
只允许最多四个线程同时运行,其它的等待执行线程release退出。
来看看源码实现:
本质上于CountDownLatch一样属于共享锁,有非公平与公平两种模式。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 以上面的例子来说明:非公平锁的获取锁逻辑是先计算出空闲机器数remaining,若其小于0直接返回remaining
//若大于等于0,CAS赋值state,返回remaining
//根据AQS共享锁的规则,返回值小于0则代表无法获取到锁需要入同步队列等待。若大于等于0,线程不会被阻塞
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 为什么要将操作放在无限for循环里?循环CAS + volatile = 线程安全
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 公平与非公平模式的尝试释放锁的操作相同,即这里的tryReleaseShared
// 还以上面例子说明:一个工人用完机器了,他需要做的就是归还机器,就是CAS更改state值,成功就返回true
protected final boolean tryReleaseShared(int releases) {
for (;;) { // 这里为什么也放在无限for循环里?循环CAS + volatile = 线程安全
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
这里说点我对线程安全的思考:同步状态state在AQS中是volatile修饰的,确保了可见性,循环CAS确保了符合操作的原子性,二者结合保证了state操作的线程安全性。这就是上面代码中两个问题的答案,以nonfairTryAcquireShared为例,一个线程通过getState得到了state的最新值,计算出了remaining大于0,于是CAS赋值,那么在这些个操作过程中,state很可能已经被改变,CAS会失败,如何处理?循环再来一次呗,再次获取state最新值进行计算不就可以了。Doug Lea大神在设计JUC框架时采用了这种非锁式的不排他的方法来确保线程安全。
非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 实现AQS共享锁的tryAcquireShared方法,内部调用了nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// Semaphore公平模式获取共享锁逻辑
protected int tryAcquireShared(int acquires) {
for (;;) {
// 之前文章说过公平就是不允许插队,实现逻辑就在hasQueuedPredecessors里。
// 该方法当同步队列里有其它等待更久的线程就返回true,代表你不被允许执行乖乖后面排对去
if (hasQueuedPredecessors())
return -1; // AQS共享锁规定tryAcquireShared返回值<0则代表获取锁失败,构成节点放到队列
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
构造函数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 布尔值选择模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire
获取锁操作:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//-----------------------------方法在AQS中
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); // 直接抛异常
//小于零没有获得许可,构造节点加入队列中等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 构造节点加入到队列。节点为共享节点代表共享锁
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // r表示剩下的空余位
if (r >= 0) {
setHeadAndPropagate(node, r); // 将当前获取锁的节点设置为头节点,根据情况唤醒后继线程
p.next = null; // help GC
failed = false;
return;
}
}
//这里shouldParkAfterFailedAcquire会将前面节点的状态改为Signal
//对于等待队列中节点的waitStatus初始为0,之后由后加入的节点改为SIGNAL
//SIGNAL这个状态它表明你有后继节点,release时唤醒它
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //直接抛异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
// 该方法只在doAcquireShared中调用。当同步队列中head.next节点被唤醒并获取了共享锁,它会调用该方法,
//目的1,将该节点设为新的头节点。
//2,尝试唤醒后继节点。我么来分析分析if的这些判断条件:若参数propagate>0(表明仍有空位可争)且有后继的共享节点,则唤醒后继节点
//or 无空缺被占满了,propagate <=0,但h的状态或是新的头节点node的状态waitStatus<0,表明有可能有后继节点(因为PROPAGATE也<0),
//这种情况下我们也去调用唤醒操作。哪怕是h为null,表示我们无法获取到相关信息的情况下仍然会去尝试唤醒node的后继节点(若其存在的化)。
// 可以看出这种设计是保守的,它尽可能多的去尝试唤醒等待的线程,跟独占锁的操作截然不同,这就造成在并发竞争下可能会发生不必要的唤醒,
//不过这些节点总是需要被唤醒的,不如就在这里多尝试。
//对于上面说的waitStatus<0情况下节点状态为PROPAGATE的情况,该种情况下同样无法获知有无后继节点
//关于PROPAGATE:该状态只在doReleaseShared中被设置,有线程释放了共享锁,但由于当时头节点状态为0,
//无法获知有无后继节点,于是将头节点状态改为PROPAGATE),
//那么此时情况可能已经变了,有了后继节点,所以我们尝试去唤醒后继共享节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //将node设置成新头节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // doReleaseShared方法唤醒head.next节点的线程
}
}
总之setHeadAndPropagate唤醒后继节点的操作偏于保守,它尽可能多的去唤醒后继的线程。
release
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 表明有后继节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 唤醒后继节点
}
// PROPAGATE这个标记只在一处用到,即这里,该方法在同步队列头节点状waitStatus=0
//时将其设为PROPAGATE ,本身doReleaseShared是为了释放后继节点的,但是当头节点状态为0,我们不知
//道有没有后继节点,所以就采用这种方式,将头节点标记为PROPAGATE,意味着将共享锁的释放传递下去
//并不会对之后的操作有什么影响,之后进队的节点获取不到锁就会调用
//shouldParkAfterFailedAcquire,该方法会将头节点改为SIGNAL。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
总结
Semaphore维持了一个同步状态state(大小初始化时设定,代表最大允许执行线程数),在acquire时将state减1,小于零加入同步队列等待,大于零则CAS更改值。
------