Semaphore,即信号计数量,通常来说,Semaphore管理了一系列许可证,每个acquire方法都会阻塞直到拿到许可证,每个release方法都会释放一个许可证,可能会释放一个阻塞的auquire方法。实际上,许可证并不存在,Semaphore仅仅保持了一组可用的数量和行为。
Semaphore经常被用来限制获取某种资源的线程数。
看一下Semaphore的使用。
public class Semaphore { public static void main(String[] args) { java.util.concurrent.Semaphore semaphore = new java.util.concurrent.Semaphore(3); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { es.execute(new Worker(i, semaphore)); } es.shutdown(); } public static class Worker implements Runnable { private int num; private java.util.concurrent.Semaphore semaphore; public Worker(int num, java.util.concurrent.Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println(num + "获取一个共享锁"); Thread.sleep(1000); semaphore.release(); System.out.println(num + "释放共享锁"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果
1获取一个共享锁 2获取一个共享锁 0获取一个共享锁 0释放共享锁 4获取一个共享锁 3获取一个共享锁 2释放共享锁 1释放共享锁 3释放共享锁 4释放共享锁
Semaphore也存在公平和非公平两种模式,无论公平还是非公平都是继承了AQS来实现的。
public Semaphore(int permits) { // 非公平模式 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { // 公平模式 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
公平模式使用NonfairSync来实现,非公平模式使用FairSync来实现。
公平模式就是按照FIFO的顺序分配许可证。
非公平模式就是说对于每一个想获取许可证的线程来说,Semaphore都会检查是否有剩余的许可证,如果有直接分配,如果没有进入AQS的同步队列进行等待,所以非公平还是抢占式,所以越活跃的线程越容易拿到许可。
看一下Semaphore的实现。
acquire方法
/** * Semaphore中获取一个许可证,没有一直阻塞直到有一个空闲的许可或者这个线程被中断 * 获取一个可用的许可之后立即返回并把可用的许可数减一 * 如果当前没有可用的许可那当前线程就会被禁用直到发生下面两种情况: * 1. 其他线程调用了release方法并且当前线程获取了许可 * 2. 其他线程把当前线程的状态设为interrupted,当前线程进行自我中断 * 如果当前线程在等待许可时,进入这个方法的时候设置了中断状态,直接抛出异常并且清除中断状态。 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
直接调用AQS的acquireSharedInterruptibly方法,以共享模式获取一个许可,没有的话一直阻塞。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
首先判断当前线程是否被中断,如果被中断直接抛出中断异常。
线程没有被中断的情况下,调用tryAvquireShared方法获取锁,如果返回值>=0,表示获取锁成功。
没有获取到锁进入AQS的同步队列进行等待。
看一下tryAcquireShared的实现,这里要区分公平模式和非公平模式的实现是不一样的。
非公平tryAcquireShared的方法...
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
非公平模式就是首先获取可用的许可,如果可用的许可减去要获取的许可remaining<0或者不能使用CAS把当前的可用状态设置为remaining的话,返回false,表示获取许可失败,在这里我们也可以看出,这是抢占式获取,也即非公平的由来。
公平的tryAcquireShared的方法...
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
公平模式首先调用hasQueuedPredecessors方法来保证了许可按照FIFO的顺序进行分配。
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
这个方法是看是否还有比当前线程等待更久的线程,如果有,返回true,tryAcuquire方法返回false,不能获取许可。我们对返回的判断条件取反,看返回false的情况,
如果h==t,说明队列为空,返回false,当前线程就是第一个线程去获取许可,代码继续往下执行。
((s = h.next) != null && s.thread == Thread.currentThread()) 当前线程就是同步队列中第一个node中的线程,说明当前线程是队列的头节点。
返回false所以tryAcquire返回true,表示可以去获取许可。这样就保证了始终按照FIFO的顺序来获取许可。
doAcquireSharedInterruptibly方法已经在AQS中进行了详细分析,不再叙说。
release方法
/** * 释放一个许可,将可用的许可数加一 * 当一个线程尝试去释放一个许可的时候,另一个线程就会去尝试获取许可。 * 并没有要求释放一个许可就必须通过acquire方法来获取许可 */ public void release() { sync.releaseShared(1); }
这个方法还是调用了AQS的releaseShared方法。看一下releaseShared的实现。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
这个方法主要做了两件事:
1. 调用tryRelaeseShared方法重新设置许可。
2. 调用doReleaseShared方法来唤醒在AQS同步队列中的后继节点来获取许可。
看一下tryRelaeseShared的实现。
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
重新设置可用许可的状态,如果设置成功,直接返回true。这段代码非常简单。
doReleaseShared就是唤醒在AQS同步队列上等待许可的节点。这个方法在AQS中进行了详细分析。
其实Semaphore的实现思想还是比较简单的,主要还是弄懂AQS的实现思想,并发包中很多类都借助 AQS来实现的。
总结
Semaphore就是信号量,它用于管理一组资源,底层还是使用AQS来实现的,不过是使用的共享模式来实现,有几个许可就代表当前最多有几个线程可以获取许可,没有获取许可的线程将进入AQS的同步队列进行等待。