1.介绍
探索 Java 中信号量和互斥体的基础知识。
2. Semaphore
从 java.util.concurrent.Semaphore 开始,可以使用信号量来限制访问特定资源的并发线程数。
在下面的例子中,将实现一个简单的登录队列来限制系统中的用户数量:
public class LoginQueueUsingSemaphore {
private Semaphore semaphore;
public LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}
public boolean tryLogin() {
return semaphore.tryAcquire();
}
public void logout() {
semaphore.release();
}
public int availableSlots() {
return semaphore.availablePermits();
}
}
请注意如何使用以下方法:
- tryAcquire () – 如果许可立即可用则返回真,否则返回假,但 acquire() 获取许可并阻塞直到一个可用
- release () – 释放许可证
- availablePermits () – 返回当前可用许可证的数量
为了测试登录队列,首先尝试达到限制并检查下一次登录尝试是否会被阻止:
@Test
public void test1() throws InterruptedException {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}
查看注销后是否有可用的许可
@Test
public void test2() throws InterruptedException {
final int slots = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();
assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}
3.定时信号量
接下来,讨论 Apache Commons TimedSemaphore. TimedSemaphore 允许多个许可作为一个简单的信号量,但在给定的时间段内,在此时间段之后时间重置并释放所有许可。
public class DelayQueueUsingTimedSemaphore {
private TimedSemaphore semaphore;
public DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}
public boolean tryAdd() {
return semaphore.tryAcquire();
}
public int availableSlots() {
return semaphore.getAvailablePermits();
}
}
当使用以2秒为时间段的延迟队列并且在2秒内使用所有插槽后,应该没有可用的:
@Test
public void test1() throws InterruptedException {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
//2s 之后信号量许可自动重置
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(2, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
//睡眠1.5s
Thread.sleep(1500);
//获取不到可用的槽
assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}
@Test
public void test2() throws InterruptedException {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
//2s 之后信号量许可自动重置
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(2, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
//睡眠2.1s
Thread.sleep(2100);
//可用的槽
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
4. Semaphore vs. Mutex
Mutex 的作用类似于二进制信号量,我们可以用它来实现互斥。
在下面的例子中,将使用一个简单的二进制信号量来构建一个计数器:
public class CounterUsingMutex {
private Semaphore mutex;
private int count;
public CounterUsingMutex() {
mutex = new Semaphore(1);
count = 0;
}
public void increase() throws InterruptedException {
mutex.acquire();
this.count = this.count + 1;
Thread.sleep(1000);
mutex.release();
}
public int getCount() {
return this.count;
}
public boolean hasQueuedThreads() {
return mutex.hasQueuedThreads();
}
}
当许多线程尝试同时访问计数器时,它们只会被阻塞在队列中:
@Test
public void test1() {
int count = 10;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
System.out.println(counter.getCount());
System.out.println(counter.hasQueuedThreads());//true
}
等待所有线程都访问计数器,队列中没有剩余线程:
@Test
public void test2() throws InterruptedException {
final int count = 5;
final ExecutorService executorService = Executors.newFixedThreadPool(count);
final CounterUsingMutex counter = new CounterUsingMutex();
CountDownLatch countDownLatch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
countDownLatch.countDown();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
countDownLatch.await();;
System.out.println(counter.hasQueuedThreads());//false
System.out.println(counter.getCount());
}