jdk提供了比synchronized更加高级的各种同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以实现更加丰富的多线程操作。
ReentrantLock
ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。
它的功能类似于synchronized是一种互斥锁,可以保证线程安全。相对于 synchronized, ReentrantLock具备如下特点:
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
它的主要应用场景是在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性。
常用API
Lock接口
ReentrantLock实现了Lock接口规范,常见API如下:
接口 | 说明 |
---|---|
void lock() | 获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回 |
void lockInterruptibly() throws InterruptedException | 可中断的获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到返回true,否则返回false |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 超时获取锁,当前线程在以下三种情况下会被返回:当前线程在超时时间内获取了锁;当前线程在超时时间内被中断;超时时间结束,返回false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的await()方法,而调用后,当前线程将释放锁 |
基本语法
//加锁 阻塞
lock.lock();
try {
...
} finally {
// 解锁
lock.unlock();
}
//尝试加锁 非阻塞
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
在使用时要注意 4 个问题:
- 默认情况下 ReentrantLock 为非公平锁而非公平锁;
- 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
- 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
- 释放锁一定要放在 finally 中,否则会导致线程阻塞。
ReentrantLock使用
独占锁:模拟抢票场景
8张票,10个人抢,如果不加锁,会出现什么问题?
/**
* 模拟抢票场景
*/
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();//默认非公平
private static int tickets = 8; // 总票数
public void buyTicket() {
lock.lock(); // 获取锁
try {
if (tickets > 0) {
// 还有票
try {
Thread.sleep(10); // 休眠10ms,模拟出并发效果
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");
} else {
System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");
}
} finally {
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) {
ReentrantLockDemo ticketSystem = new ReentrantLockDemo();
for (int i = 1; i <= 10; i++) {
Thread thread = new Thread(() -> {
ticketSystem.buyTicket(); // 抢票
}, "线程" + i);
// 启动线程
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("剩余票数:" + tickets);
}
}
不加锁的效果: 出现超卖的问题
加锁效果: 正常,两个人抢票失败
公平锁和非公平锁
ReentrantLock支持公平锁和非公平锁两种模式:
- 公平锁:线程在获取锁时,按照等待的先后顺序获取锁。
- 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁
ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁
ReentrantLock lock = new ReentrantLock(true); //公平锁
比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁,如下图:
可重入锁
- 可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
- Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。在实际开发中,可重入锁常常应用于递归操作、调用同一个类中的其他方法、锁嵌套等场景中。
class Counter {
private final ReentrantLock lock = new ReentrantLock(); // 创建 ReentrantLock 对象
public void recursiveCall(int num) {
lock.lock(); // 获取锁
try {
if (num == 0) {
return;
}
System.out.println("执行递归,num = " + num);
recursiveCall(num - 1);
} finally {
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter(); // 创建计数器对象
// 测试递归调用
counter.recursiveCall(10);
}
}
结合Condition实现生产者消费者模式
- java.util.concurrent类库中提供Condition类来实现线程之间的协调。
- 调用Condition.await() 方法使线程等待,其他线程调用Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。
注意:调用Condition的await()和signal()方法,都必须在lock保护之内。
案例:基于ReentrantLock和Condition实现一个简单队列
public class ReentrantLockDemo3 {
public static void main(String[] args) {
// 创建队列
Queue queue = new Queue(5);
//启动生产者线程
new Thread(new Producer(queue)).start();
//启动消费者线程
new Thread(new Customer(queue)).start();
}
}
/**
* 队列封装类
*/
class Queue {
private Object[] items ;
int size = 0;
int takeIndex;
int putIndex;
private ReentrantLock lock;
public Condition notEmpty; //消费者线程阻塞唤醒条件,队列为空阻塞,生产者生产完唤醒
public Condition notFull; //生产者线程阻塞唤醒条件,队列满了阻塞,消费者消费完唤醒
public Queue(int capacity){
this.items = new Object[capacity];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(Object value) throws Exception {
//加锁
lock.lock();
try {
while (size == items.length)
// 队列满了让生产者等待
notFull.await();
items[putIndex] = value;
if (++putIndex == items.length)
putIndex = 0;
size++;
notEmpty.signal(); // 生产完唤醒消费者
} finally {
System.out.println("producer生产:" + value);
//解锁
lock.unlock();
}
}
public Object take() throws Exception {
lock.lock();
try {
// 队列空了就让消费者等待
while (size == 0)
notEmpty.await();
Object value = items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
size--;
notFull.signal(); //消费完唤醒生产者生产
return value;
} finally {
lock.unlock();
}
}
}
/**
* 生产者
*/
class Producer implements Runnable {
private Queue queue;
public Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
// 隔1秒轮询生产一次
while (true) {
Thread.sleep(1000);
queue.put(new Random().nextInt(1000));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费者
*/
class Customer implements Runnable {
private Queue queue;
public Customer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
// 隔2秒轮询消费一次
while (true) {
Thread.sleep(2000);
System.out.println("consumer消费:" + queue.take());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
应用场景总结
ReentrantLock具体应用场景如下:
- 解决多线程竞争资源的问题,例如多个线程同时对同一个数据库进行写操作,可以使用ReentrantLock保证每次只有一个线程能够写入。
- 实现多线程任务的顺序执行,例如在一个线程执行完某个任务后,再让另一个线程执行任务。
- 实现多线程等待/通知机制,例如在某个线程执行完某个任务后,通知其他线程继续执行任务。
Semaphore
Semaphore(信号量)是一种用于多线程编程的同步工具,用于控制同时访问某个资源的线程数量。
- Semaphore维护了一个计数器,线程可以通过调用acquire()方法来获取Semaphore中的许可证,
- 当计数器为0时,调用acquire()的线程将被阻塞,直到有其他线程释放许可证;
- 线程可以通过调用release()方法来释放Semaphore中的许可证,这会使Semaphore中的计数器增加,从而允许更多的线程访问共享资源。
常用API
构造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- permits 表示许可证的数量(资源数)
- fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
常用方法
- acquire() 表示阻塞并获取许可
- tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
- release() 表示释放许可
Semaphore使用
Semaphore实现服务接口限流
@Slf4j
public class SemaphoreDemo {
/**
* 同一时刻最多只允许有两个并发
*/
private static Semaphore semaphore = new Semaphore(2);
private static Executor executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for(int i=0;i<10;i++){
executor.execute(()->getProductInfo2());
}
}
public static String getProductInfo() {
try {
semaphore.acquire();
log.info("请求服务");
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
semaphore.release();
}
return "返回商品详情信息";
}
public static String getProductInfo2() {
if(!semaphore.tryAcquire()){
log.error("请求被流控了");
return "请求被流控了";
}
try {
log.info("请求服务");
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
semaphore.release();
}
return "返回商品详情信息";
}
}
Semaphore实现数据库连接池
public class SemaphoreDemo2 {
public static void main(String[] args) {
final ConnectPool pool = new ConnectPool(2);
ExecutorService executorService = Executors.newCachedThreadPool();
//5个线程并发来争抢连接资源
for (int i = 0; i < 5; i++) {
final int id = i + 1;
executorService.execute(new Runnable() {
@Override
public void run() {
Connect connect = null;
try {
System.out.println("线程" + id + "等待获取数据库连接");
connect = pool.openConnect();
System.out.println("线程" + id + "已拿到数据库连接:" + connect);
//进行数据库操作2秒...然后释放连接
Thread.sleep(2000);
System.out.println("线程" + id + "释放数据库连接:" + connect);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.releaseConnect(connect);
}
}
});
}
}
}
//数据库连接池
class ConnectPool {
private int size;
private Connect[] connects;
//记录对应下标的Connect是否已被使用
private boolean[] connectFlag;
//信号量对象
private Semaphore semaphore;
/**
* size:初始化连接池大小
*/
public ConnectPool(int size) {
this.size = size;
semaphore = new Semaphore(size, true);
connects = new Connect[size];
connectFlag = new boolean[size];
initConnects();//初始化连接池
}
private void initConnects() {
for (int i = 0; i < this.size; i++) {
connects[i] = new Connect();
}
}
/**
* 获取数据库连接
*
* @return
* @throws InterruptedException
*/
public Connect openConnect() throws InterruptedException {
//得先获得使用许可证,如果信号量为0,则拿不到许可证,一直阻塞直到能获得
semaphore.acquire();
return getConnect();
}
private synchronized Connect getConnect() {
for (int i = 0; i < connectFlag.length; i++) {
if (!connectFlag[i]) {
//标记该连接已被使用
connectFlag[i] = true;
return connects[i];
}
}
return null;
}
/**
* 释放某个数据库连接
*/
public synchronized void releaseConnect(Connect connect) {
for (int i = 0; i < this.size; i++) {
if (connect == connects[i]) {
connectFlag[i] = false;
semaphore.release();
}
}
}
}
/**
* 数据库连接
*/
class Connect {
private static int count = 1;
private int id = count++;
public Connect() {
//假设打开一个连接很耗费资源,需要等待1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("连接#" + id + "#已与数据库建立通道!");
}
@Override
public String toString() {
return "#" + id + "#";
}
}
应用场景总结
- 限流:Semaphore可以用于限制对共享资源的并发访问数量,以控制系统的流量。
- 资源池:Semaphore可以用于实现资源池,以维护一组有限的共享资源。
CountDownLatch
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
- CountDownLatch使用给定的计数值(count)初始化。
- await方法会阻塞当前的计数值(count),
- 由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。
- 这是一个一次性现象 —— count不会被重置。