一、java5的Semaphere同步工具
信号量为多个线程协作提供了更为强大的控制方法,广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。
Semaphore 可以维护当前访问自身的线程个数,并提供了同步机制,使用Semaphere可以控制同时访问资源的线程个数。
如有必要,在许可可用前会阻塞每一个acquire()
,即若无法获取,线程会等待。然后再获取该许可。每个release()
添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore
只对可用许可的号码进行计数,并采取相应的行动。
acquireUninterruptibly()
与aquire方法类似,但是不响应中断,tryAcquire()
尝试获取一个许可,如果成功返回ture,失败返回false.
Semaphore
通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问(伪代码):
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。注意,调用
acquire()
时无法保持同步锁,因为这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的
1.2 案例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3-sp.availablePermits()) + "个并发");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"即将离开");
sp.release();
//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
}
};
service.execute(runnable);
}
}
}
运行结果
线程pool-1-thread-1进入,当前已有2个并发
线程pool-1-thread-3进入,当前已有3个并发
线程pool-1-thread-2进入,当前已有2个并发
线程pool-1-thread-2即将离开
线程pool-1-thread-2已离开,当前已有2个并发
线程pool-1-thread-5进入,当前已有3个并发
线程pool-1-thread-5即将离开
线程pool-1-thread-5已离开,当前已有2个并发
线程pool-1-thread-4进入,当前已有3个并发
线程pool-1-thread-3即将离开
线程pool-1-thread-3已离开,当前已有2个并发
线程pool-1-thread-6进入,当前已有3个并发
线程pool-1-thread-6即将离开
线程pool-1-thread-8进入,当前已有3个并发
线程pool-1-thread-6已离开,当前已有3个并发
线程pool-1-thread-1即将离开
线程pool-1-thread-1已离开,当前已有2个并发
线程pool-1-thread-9进入,当前已有3个并发
线程pool-1-thread-9即将离开
线程pool-1-thread-9已离开,当前已有2个并发
线程pool-1-thread-7进入,当前已有3个并发
线程pool-1-thread-4即将离开
线程pool-1-thread-4已离开,当前已有2个并发
线程pool-1-thread-10进入,当前已有3个并发
线程pool-1-thread-8即将离开
线程pool-1-thread-8已离开,当前已有2个并发
线程pool-1-thread-10即将离开
线程pool-1-thread-10已离开,当前已有1个并发
线程pool-1-thread-7即将离开
线程pool-1-thread-7已离开,当前已有0个并发
参考
- 张孝祥-Java多线程与并发库高级应用
- 《实战java高并发程序设计》