类图
源码:
package java.util.concurrent;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;//版本号
private final Sync sync;//内部类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;//版本号
//构造器,将允许运行的数量设为permits
Sync(int permits) {
setState(permits);
}
//根据state得到允许运行的数量
final int getPermits() {
return getState();
}
//不公平共享锁获取(状态减去acquires)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();//获取当前状态
int remaining = available - acquires;//状态值减acquires
//若状态值小于0则不更新,直接返回-1,表示获取锁失败
//状态值大于等于0,更新当前状态值,返回大于等于0则表示成功
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;//只有此处才能退出循环
}
}
//公平锁释放(状态增加acquires)
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();//获取当前状态
int next = current + releases;//状态值加acquires
if (next < current)//若状态值有误,抛出异常
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//只有修改成功,才能退出循环
return true;
}
}
//允许运行数减少reductions
final void reducePermits(int reductions) {
for (;;) {
int current = getState();//得到当前允许运行数
int next = current - reductions;//将允许运行数减去reductions
if (next > current)//若发现允许运行数发生了变化,则抛出异常
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))//只有修改成功,才能退出循环
return;
}
}
//返回允许运行的数,并将允许运行数置为0(“耗尽”所有剩余共享资源)
final int drainPermits() {
for (;;) {
int current = getState();//得到当前状态值
if (current == 0 || compareAndSetState(current, 0))//只有允许修改成功或剩余为0,才能退出循环
return current;//返回当前状态值
}
}
}
//不公平策略
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;//版本号
//和公平策略构造器相同
NonfairSync(int permits) {//调用sync的构造器初始化允许运行数
super(permits);
}
//直接调用nonfairTryAcquireShared,尝试获取不公平的共享锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//公平策略
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;//版本号
//和不公平策略构造器相同
FairSync(int permits) {//调用sync的构造器初始化允许运行数
super(permits);
}
//尝试获取公平的共享锁。和不公平的共享锁不同的关键方法
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())//存在前继等待节点则直接返回
return -1;
int available = getState();//得到当前状态值
int remaining = available - acquires;//当前状态减去acquires
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;//修改成功
}
}
}
//permits:一次性允许运行的线程数
//一个参数构造默认使用不公平策略.
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//permits:一次性允许运行的线程数
//fair:是否使用公平策略.
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//可响应中断的获取共享锁1个(公平和不公平)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//不支持响应中断的获取共享锁1个(公平和不公平)
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//不公平的尝试获取共享锁1个(不支持中断)
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;//大于等于0,则获取成功
}
//支持中断在指定时间内获取共享锁1个
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//计数值增加1
public void release() {
sync.releaseShared(1);//计数值增加1
}
/******************不使用时,每次调用一次性减1;**************************/
/******************传入permits时,每次调用一次性减permits;***************/
//可响应中断的获取共享锁(公平和不公平)permits个
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常
sync.acquireSharedInterruptibly(permits);
}
//不支持响应中断的获取共享锁permits个(公平和不公平)
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常
sync.acquireShared(permits);
}
//不公平的尝试获取共享锁permits个(不支持中断)
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常
return sync.nonfairTryAcquireShared(permits) >= 0;//大于等于0,则获取成功
}
//支持中断在指定时间内获取共享锁permits个
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
//增加指定数目permits(增加计数值permits)
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();//permits小于0,则抛出非法参数异常
sync.releaseShared(permits);//增加计数值
}
//得到当前运行的数
public int availablePermits() {
return sync.getPermits();
}
//返回允许运行的数,并将允许运行数置为0(“耗尽”所有剩余共享资源)
public int drainPermits() {
return sync.drainPermits();
}
//将允许运行数减少指定数目reduction.(“缩减”剩余共享资源)
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();//reduction小于0,则抛出非法参数异常
sync.reducePermits(reduction);
}
//根据判断sync 是否为FairSync类型,返回是否为公平锁
public boolean isFair() {
return sync instanceof FairSync;
}
//返回队列中是否存在等待状态的节点
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
//得到队列元素的总数
public final int getQueueLength() {
return sync.getQueueLength();
}
//得到队列中的线程集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
//得到字符串表示
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
类 Semaphore
一个计数信号量,信号量维护了一个许可集。
在许可可用前会阻塞每一个调用acquire()的线程。
已获取共享锁的线程,执行 release()
添加一个许可,从而可能释放一个正在阻塞的获取者。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
构造方法摘要
Semaphore(int permits) 创建具有给定的许可数和非公平的公平设置的 Semaphore 。 |
Semaphore(int permits, boolean fair) 创建具有给定的许可数和给定的公平设置的 Semaphore 。 |
构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。
特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire()
的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。
当公平设置为 true 时,信号量保证对于任何调用获取
方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。
非同步的 tryAcquire
方法不使用公平设置,而是使用任意可用的许可。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。
为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
方法摘要
void |
acquire() 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。 |
void |
acquire(int permits) 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。 |
void |
acquireUninterruptibly() 从此信号量中获取许可,在有可用的许可前将其阻塞。 |
void |
acquireUninterruptibly(int permits) 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。 |
int |
availablePermits() 返回此信号量中当前可用的许可数。 |
int |
drainPermits() 获取并返回立即可用的所有许可。 |
protected Collection<Thread> |
getQueuedThreads() 返回一个 collection,包含可能等待获取的线程。 |
int |
getQueueLength() 返回正在等待获取的线程的估计数目。 |
boolean |
hasQueuedThreads() 查询是否有线程正在等待获取。 |
boolean |
isFair() 如果此信号量的公平设置为 true,则返回 true 。 |
protected void |
reducePermits(int reduction) 根据指定的缩减量减小可用许可的数目。 |
void |
release() 释放一个许可,将其返回给信号量。 |
void |
release(int permits) 释放给定数目的许可,将其返回到信号量。 |
String |
toString() 返回标识此信号量的字符串,以及信号量的状态。 |
boolean |
tryAcquire() 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。 |
boolean |
tryAcquire(int permits) 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。 |
boolean |
tryAcquire(int permits, long timeout, TimeUnit unit) 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。 |
boolean |
tryAcquire(long timeout, TimeUnit unit) 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。 |
Semaphore
public Semaphore(int permits)
创建具有给定的许可数和非公平的公平设置的 Semaphore
。
参数:
permits
- 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
Semaphore
public Semaphore(int permits, boolean fair)
创建具有给定的许可数和给定的公平设置的 Semaphore
。
参数:
permits
- 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
fair
- 如果此信号量保证在争用时按先进先出的顺序授予许可,则为 true
;否则为 false
。
acquire
public void acquire() throws InterruptedException
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被 中断。
获取一个许可(如果提供了一个)并立即返回,将可用的许可数减 1。
如果没有可用的许可,则在发生以下两种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
如果当前线程:
- 被此方法将其已中断状态设置为 on ;或者
- 在等待许可时被
中断
。
则抛出 InterruptedException
,并且清除当前线程的已中断状态。
抛出:
InterruptedException
- 如果当前线程被中断
acquireUninterruptibly
public void acquireUninterruptibly()
从此信号量中获取许可,在有可用的许可前将其阻塞。
获取一个许可(如果提供了一个)并立即返回,将可用的允许数减 1。
如果没有可用的许可,则在其他某些线程调用此信号量的 release()
方法,并且当前线程是下一个要被分配许可的线程前,禁止当前线程用于线程安排目的并使其处于休眠状态。
如果当前线程在等待许可时被中断,那么它将继续等待,但是与没有发生中断,其将接收允许的时间相比,为该线程分配许可的时间可能改变。当线程确实从此方法返回后,将设置其中断状态。
tryAcquire
public boolean tryAcquire()
仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
获取一个许可(如果提供了一个)并立即返回,其值为 true
,将可用的许可数减 1。
如果没有可用的许可,则此方法立即返回并且值为 false
。
即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire()
也将 立即获取许可(如果有一个可用),而不管当前是否有正在等待的线程。在某些情况下,此“闯入”行为可能很有用,即使它会打破公平性也如此。如果希望遵守公平设置,则使用 tryAcquire(0, TimeUnit.SECONDS)
,它几乎是等效的(它也检测中断)。
返回:
如果获取了许可,则返回 true
;否则返回 false
。
tryAcquire
public boolean tryAcquire(long timeout,TimeUnit unit) throws InterruptedException
如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被 中断,则从此信号量获取一个许可。
获取一个许可(如果提供了一个)并立即返回,其值为 true
,将可用的许可数减 1。
如果没有可用的允许,则在发生以下三种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
- 其他某些线程调用此信号量的
release()
方法并且当前线程是下一个被分配许可的线程;或者 - 其他某些线程中断当前线程;或者
- 已超出指定的等待时间。
如果获取了许可,则返回值为 true
。
如果当前线程:
- 被此方法将其已中断状态设置为 on ;或者
- 在等待获取许可的同时被中断。
则抛出 InterruptedException
,并且清除当前线程的已中断状态。
如果超出了指定的等待时间,则返回值为 false
。如果该时间小于等于 0,则方法根本不等待。
参数:
timeout
- 等待许可的最多时间
unit
- timeout
参数的时间单位
返回:
如果获取了许可,则返回 true
;如果获取许可前超出了等待时间,则返回 false
抛出:
InterruptedException
- 如果当前线程是已中断的
release
public void release()
释放一个许可,将其返回给信号量。
释放一个许可,将可用的许可数增加 1。如果任意线程试图获取许可,则选中一个线程并将刚刚释放的许可给予它。然后针对线程安排目的启用(或再启用)该线程。
不要求释放许可的线程必须通过调用 acquire()
来获取许可。通过应用程序中的编程约定来建立信号量的正确用法。
acquire
public void acquire(int permits) throws InterruptedException
从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被 中断。
获取给定数目的许可(如果提供了)并立即返回,将可用的许可数减去给定的量。
如果没有足够的可用许可,则在发生以下两种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
如果当前线程:
- 被此方法将其已中断状态设置为 on ;或者
- 在等待许可时被中断。
则抛出 InterruptedException
,并且清除当前线程的已中断状态。任何原本应该分配给此线程的许可将被分配给其他试图获取许可的线程,就好像已通过调用 release()
而使许可可用一样。
参数:
permits
- 要获取的许可数
抛出:
InterruptedException
- 如果当前线程已被中断
IllegalArgumentException
- 如果 permits
为负
acquireUninterruptibly
public void acquireUninterruptibly(int permits)
从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
获取给定数目的许可(如果提供了)并立即返回,将可用的许可数减去给定的量。
如果没有足够的可用许可,则在其他某些线程调用此信号量的某个释放
方法,当前线程是下一个要被分配许可的线程,并且可用的许可数目满足此请求前,禁止当前线程用于线程安排目的并使其处于休眠状态。
如果当前的线程在等待许可时被中断,则它会继续等待并且它在队列中的位置不受影响。当线程确实从此方法返回后,将其设置为中断状态。
参数:
permits
- 要获取的许可数
抛出:
IllegalArgumentException
- 如果 permits
为负
tryAcquire
public boolean tryAcquire(int permits)
仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
获取给定数目的许可(如果提供了)并立即返回,其值为 true
,将可用的许可数减去给定的量。
如果没有足够的可用许可,则此方法立即返回,其值为 false
,并且不改变可用的许可数。
即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire
也将 立即获取许可(如果有一个可用),而不管当前是否有正在等待的线程。在某些情况下,此“闯入”行为可能很有用,即使它会打破公平性也如此。如果希望遵守公平设置,则使用 tryAcquire(permits, 0, TimeUnit.SECONDS)
,它几乎是等效的(它也检测中断)。
参数:
permits
- 要获取的许可数
返回:
如果获取了许可,则返回 true
;否则返回 false
抛出:
IllegalArgumentException
- 如果 permits
为负
tryAcquire
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被 中断,则从此信号量获取给定数目的许可。
获取给定数目的许可(如果提供了)并立即返回,其值为 true
,将可用的许可数减去给定的量。
如果没有足够的可用许可,则在发生以下三种情况之一前,禁止将当前线程用于线程安排目的并使其处于休眠状态:
如果获取了许可,则返回值为 true
。
如果当前线程:
- 被此方法将其已中断状态设置为 on ;或者
- 在等待获取允许的同时被中断。
则抛出 InterruptedException
,并且清除当前线程的已中断状态。任何原本应该分配给此线程的许可将被分配给其他试图获取许可的线程,就好像已通过调用 release()
而使许可可用一样。
如果超出了指定的等待时间,则返回值为 false
。如果该时间小于等于 0,则方法根本不等待。任何原本应该分配给此线程的许可将被分配给其他试图获取许可的线程,就好像已通过调用 release()
而使许可可用一样。
参数:
permits
- 要获取的许可数
timeout
- 等待许可的最多时间
unit
- timeout
参数的时间单位
返回:
如果获取了许可,则返回 true
;如果获取所有许可前超出了等待时间,则返回 false
抛出:
InterruptedException
- 如果当前线程是已中断的
IllegalArgumentException
- 如果 permits
为负
release
public void release(int permits)
释放给定数目的许可,将其返回到信号量。
释放给定数目的许可,将可用的许可数增加该量。如果任意线程试图获取许可,则选中某个线程并将刚刚释放的许可给予该线程。如果可用许可的数目满足该线程的请求,则针对线程安排目的启用(或再启用)该线程;否则在有足够的可用许可前线程将一直等待。如果满足此线程的请求后仍有可用的许可,则依次将这些许可分配给试图获取许可的其他线程。
不要求释放许可的线程必须通过调用获取
来获取该许可。通过应用程序中的编程约定来建立信号量的正确用法。
参数:
permits
- 要释放的许可数
抛出:
IllegalArgumentException
- 如果 permits
为负
availablePermits
public int availablePermits()
返回此信号量中当前可用的许可数。
此方法通常用于调试和测试目的。
返回:
此信号量中的可用许可数
drainPermits
public int drainPermits()
获取并返回立即可用的所有许可。
返回:
获取的许可数
reducePermits
protected void reducePermits(int reduction)
根据指定的缩减量减小可用许可的数目。此方法在使用信号量来跟踪那些变为不可用资源的子类中很有用。此方法不同于 acquire
,在许可变为可用的过程中,它不会阻塞等待。
参数:
reduction
- 要移除的许可数
抛出:
IllegalArgumentException
- 如果 reduction
是负数
isFair
public boolean isFair()
如果此信号量的公平设置为 true,则返回 true
。
返回:
如果此信号量的公平设置为 true,则返回 true
hasQueuedThreads
public final boolean hasQueuedThreads()
查询是否有线程正在等待获取。注意,因为同时可能发生取消,所以返回 true
并不保证有其他线程等待获取许可。此方法主要用于监视系统状态。
返回:
如果可能有其他线程正在等待获取锁,则返回 true
getQueueLength
public final int getQueueLength()
返回正在等待获取的线程的估计数目。该值仅是估计的数字,因为在此方法遍历内部数据结构的同时,线程的数目可能动态地变化。此方法用于监视系统状态,不用于同步控制。
返回:
正在等待此锁的线程的估计数目
getQueuedThreads
protected Collection<Thread> getQueuedThreads()
返回一个 collection,包含可能等待获取的线程。因为在构造此结果的同时实际的线程 set 可能动态地变化,所以返回的 collection 仅是尽力的估计值。所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,提供更多的监视设施。
返回:
线程 collection
toString
public String toString()
返回标识此信号量的字符串,以及信号量的状态。括号中的状态包括 String 类型的 "Permits ="
,后跟许可数。
覆盖:
返回:
标识此信号量的字符串,以及信号量的状态
使用实例:
- 1.银行有3个柜台,每个柜台只能每次服务1个客户:
package com.thread;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo implements Runnable{
private Semaphore smp = new Semaphore(3);
@Override
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " start");
smp.acquire();
System.out.println("Thread " + Thread.currentThread().getName() + " is working");
Thread.sleep(1000);
smp.release();
System.out.println("Thread " + Thread.currentThread().getName() + " is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
for (int i=1;i<=9;i++){
new Thread(semaphoreDemo).start();
}
}
}
运行结果:
Thread Thread-0 start
Thread Thread-3 start
Thread Thread-0 is working
Thread Thread-1 start
Thread Thread-2 start
Thread Thread-1 is working
Thread Thread-6 start
Thread Thread-5 start
Thread Thread-3 is working
Thread Thread-4 start
Thread Thread-7 start
Thread Thread-8 start
Thread Thread-0 is over
Thread Thread-2 is working
Thread Thread-3 is over
Thread Thread-5 is working
Thread Thread-1 is over
Thread Thread-6 is working
Thread Thread-2 is over
Thread Thread-5 is over
Thread Thread-8 is working
Thread Thread-4 is working
Thread Thread-7 is working
Thread Thread-6 is over
Thread Thread-8 is over
Thread Thread-7 is over
Thread Thread-4 is over
红色部分说明:只能一次性运行3个线程,必须要等待某一个线程执行 release() 之后,才能唤醒等待的某一个线程继续执行。
青色部分说明:由于打印语句的执行可能会滞后,因此此处的运行结果不能说明 Semaphore 设计有误,所以运行结果只能作为参考。
- 2.银行有3个柜台,一次性服务3个,服务完毕3个,才能继续运行下3个客户;若一次性服务所需柜台数超过实际柜台数,则无法继续进行运行;只要所需柜台数小于等于实际柜台数,均可以正常运行下去:
package com.thread;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo implements Runnable{
private Semaphore smp = new Semaphore(3);
@Override
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " start");
smp.acquire(3);
System.out.println("Thread " + Thread.currentThread().getName() + " is working");
Thread.sleep(1000);
smp.release(3);
System.out.println("Thread " + Thread.currentThread().getName() + " is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args){
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
for (int i=1;i<=9;i++){
new Thread(semaphoreDemo).start();
}
}
}
运行结果:
Thread Thread-0 start
Thread Thread-3 start
Thread Thread-4 start
Thread Thread-6 start
Thread Thread-1 start
Thread Thread-7 start
Thread Thread-8 start
Thread Thread-0 is working
Thread Thread-5 start
Thread Thread-2 start
Thread Thread-0 is over
Thread Thread-3 is working
Thread Thread-3 is over
Thread Thread-4 is working
Thread Thread-4 is over
Thread Thread-6 is working
Thread Thread-6 is over
Thread Thread-1 is working
Thread Thread-1 is over
Thread Thread-7 is working
Thread Thread-7 is over
Thread Thread-8 is working
Thread Thread-8 is over
Thread Thread-5 is working
Thread Thread-5 is over
Thread Thread-2 is working
Thread Thread-2 is over
由于需要一次性获取3个,所以只有一个线程执行完毕,才能执行下一个线程。