前言
Java除了提供synchronized关键字来实现线程同步,还提供了一些锁相关的类来实现线程同步。Lock和ReadWriteLock就是两个锁的根接口,使用Lock来实现线程同步,比使用synchronized关键字更加灵活,程序员们有更多的可操作空间。在此之前,我们先扒一扒锁的可重入性。
锁的可重入性
synchronized同步块是可重入的,我们平时好像很难见到不可重入的锁。我觉得要想说清楚啥叫可重入、啥叫不可重入还是上个代码比较直观:
synchronized(Test1.class) {
System.out.println("第一次获取锁");
synchronized(Test1.class) {
System.out.println("第二次获取锁");
}
}
运行上面代码,输出:
第一次获取锁
第二次获取锁
这就证明synchronized关键字是可重入的,如果不是,程序会一直阻塞在第二个同步代码块获取监视器Test1.class那里,而不会进入第二个同步代码块。
要想知道啥是不可重入锁,我估计得自己实现一个,如下:
package lihao.thread;
import java.util.concurrent.TimeUnit;
public class MyFirstLock {
private volatile boolean isLocked = false;
public synchronized void lock() throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
public static void main(String[] args) {
MyFirstLock lock = new MyFirstLock();
Runnable task = new Runnable() {
@Override
public void run() {
try {
lock.lock();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(threadName + "开始执行任务");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + "执行任务结束");
lock.unlock();
}
};
new Thread(task, "A").start();
new Thread(task, "B").start();
}
}
如上,我们实现了一个锁,并且运行了一下主函数证明它是有用的。接下来看它是不是可重入的,我们把主函数改成这样:
public static void main(String[] args) {
MyFirstLock lock = new MyFirstLock();
Runnable task = new Runnable() {
@Override
public void run() {
try {
lock.lock();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("第一次加锁");
try {
lock.lock();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("第二次加锁");
lock.unlock();
lock.unlock();
}
};
new Thread(task, "A").start();
}
改完后运行主函数,发现只打印了“第一次加锁”,然后程序就阻塞了,这种现象就叫做锁不可重入。
当前的判断条件是只有当isLocked为false时lock操作才被允许,而没有考虑是哪个线程锁住了它。为了让这个Lock类具有可重入性,我们需要对它做一点小的改动:
private volatile boolean isLocked = false;
private Thread lockedBy = null;
private int lockedCount = 0;
public synchronized void lock() throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(isLocked && lockedBy != callingThread){
wait();
}
isLocked = true;
lockedCount++;
lockedBy = callingThread;
}
public synchronized void unlock(){
if (Thread.currentThread() == lockedBy) {
lockedCount--;
System.out.println(lockedCount);
if (lockedCount == 0) {
isLocked = false;
lockedBy = null;
notify();
}
}
}
现在,MyFirstLock是可重入锁了。
Lock接口
该接口位于java.util.concurrent.locks包下,它有一个官方实现类叫ReentrantLock,顾名思义是一个可重入锁。Lock接口有一个比较有特色的的方法叫tryLock(),它有一个boolean类型的返回值,true代表加锁成功,并且tryLock还可以设置超时时间。使用tryLock()可以避免在加锁时等待时间过长,尤其是在并发量比较高时,给tryLock设置合理的超时时间,可以避免大量线程阻塞:
boolean lockRsp = false;
try {
lockRsp = lock.tryLock(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!lockRsp) {
return;
}
有时如果每个线程都使用同一把锁加锁,会导致效率非常低下,因为如果锁已经被某个线程获取到了,其他线程只能等待锁释放。在某些场景下,我们可以对此进行一些优化。假设我有一个系统,系统里每个用户只操作自己的资源,也就是说用户之间不存在竞态条件,但是同一个用户的请求会产生竞态条件,这时可以给每个用户分配一把锁,而不是所有用户使用同一把锁,这样用户之间就可以互不影响:
private ConcurrentHashMap<Long, ReentrantLock> map = new ConcurrentHashMap<>();
public void doSomeThing(Long userId) {
ReentrantLock newLock = new ReentrantLock();
//putIfAbsent,如果key存在就返回原来的value,不会覆盖原来的值;
//如果key不存在就放入新的键值对,返回值是null
ReentrantLock oldLock = map.putIfAbsent(userId, newLock);
if (oldLock == null) {
newLock.lock();
} else {
oldLock.lock();
}
try {
//业务逻辑
} finally {
ReentrantLock lockInUse = map.get(userId);
if (lockInUse != null && lockInUse.isHeldByCurrentThread()) {
lockInUse.unlock();
}
}
}
不过这样写的前提是:执行业务逻辑消耗的时间预计比ConcurrentHashMap的putIfAbsent()消耗的时间要长,否则根据用户来分段加锁也没啥意义,因为ConcurrentHashMap本身也是线程安全的,所以也涉及到线程同步的问题,虽然它效率很高。
ReadWriteLock接口
假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源。但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写。这就需要一个读/写锁来解决这个问题。
ReadWriteLock也是java.util.concurrent.locks包下的一个接口,他跟Lock接口没啥直接关系。读写锁比较复杂,所以这里要扒一扒它的原理。
思路以及简单实现
先让我们对读写访问资源的条件做个概述:
读取 :没有线程正在做写操作,且没有线程在请求写操作。
写入 :没有线程正在做读写操作。
如果某个线程想要读取资源,只要没有线程正在对该资源进行写操作且没有线程请求对该资源的写操作即可。我们假设对写操作的请求比对读操作的请求更重要,就要提升写请求的优先级。此外,如果读操作发生的比较频繁,我们又没有提升写操作的优先级,那么就会产生“饥饿”现象。请求写操作的线程会一直阻塞,直到所有的读线程都从ReadWriteLock上解锁了。如果一直保证新线程的读操作权限,那么等待写操作的线程就会一直阻塞下去,结果就是发生“饥饿”。因此,只有当没有线程正在锁住ReadWriteLock进行写操作,且没有线程请求该锁准备执行写操作时,才能保证读操作继续。
当其它线程没有对共享资源进行读操作或者写操作时,某个线程就有可能获得该共享资源的写锁,进而对共享资源进行写操作。有多少线程请求了写锁以及以何种顺序请求写锁并不重要,除非你想保证写锁请求的公平性。
根据上面的叙述,我们可以先实现一个简单的读写锁:
package lihao.thread;
public class MyReadWriteLock {
private int readers = 0;
private int writers = 0;
private int writeRequests = 0;
public synchronized void lockRead() throws InterruptedException {
while (writers > 0 || writeRequests > 0) {
wait();
}
readers++;
}
public synchronized void unlockRead() {
readers--;
notifyAll();
}
public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
while(readers > 0 || writers > 0){
wait();
}
writeRequests--;
writers++;
}
public synchronized void unlockWrite() throws InterruptedException{
writers--;
notifyAll();
}
}
需要注意的是,在两个释放锁的方法(unlockRead,unlockWrite)中,都调用了notifyAll方法,而不是notify。要解释这个原因,我们可以想象下面一种情形:
如果有线程在等待获取读锁,同时又有线程在等待获取写锁。如果这时其中一个等待读锁的线程被notify方法唤醒,但因为此时仍有请求写锁的线程存在(writeRequests>0),所以被唤醒的线程会再次进入阻塞状态。然而,等待写锁的线程一个也没被唤醒,就像什么也没发生过一样(信号丢失现象)。如果用的是notifyAll方法,所有的线程都会被唤醒,然后判断能否获得其请求的锁。
用notifyAll还有一个好处。如果有多个读线程在等待读锁且没有线程在等待写锁时,调用unlockWrite()后,所有等待读锁的线程都能立马成功获取读锁 , 而不是一次只允许一个。
读/写锁的重入
上面实现的读/写锁(ReadWriteLock) 是不可重入的,当一个已经持有写锁的线程再次请求写锁时,就会被阻塞。原因是已经有一个写线程了——就是它自己。此外,考虑下面的例子:
1.Thread 1 获得了读锁
2.Thread 2 请求写锁,但因为Thread 1 持有了读锁,所以写锁请求被阻塞。
3.Thread 1 再想请求一次读锁,但因为Thread 2处于请求写锁的状态,所以想再次获取读锁也会被阻塞。
上面这种情形使用前面的ReadWriteLock就会被锁定——一种类似于死锁的情形。不会再有线程能够成功获取读锁或写锁了。
为了让ReadWriteLock可重入,需要对它做一些改进。下面会分别处理读锁的重入和写锁的重入。
读锁重入
为了让ReadWriteLock的读锁可重入,我们要先为读锁重入建立规则:
要保证某个线程中的读锁可重入,要么满足获取读锁的条件(没有写或写请求),要么已经持有读锁(不管是否有写请求)。
要确定一个线程是否已经持有读锁,可以用一个map来存储已经持有读锁的线程以及对应线程获取读锁的次数,当需要判断某个线程能否获得读锁时,就利用map中存储的数据进行判断。下面是方法lockRead和unlockRead修改后的的代码:
package lihao.thread;
import java.util.HashMap;
import java.util.Map;
public class MyReadWriteLock {
private Map<Thread, Integer> readingThreads = new HashMap<>();
private int writers = 0;
private int writeRequests = 0;
public synchronized void lockRead() throws InterruptedException {
Thread callingThread = Thread.currentThread();
while (canGrantReadAccess(callingThread)) {
wait();
}
readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
}
private boolean canGrantReadAccess(Thread callingThread) {
if (writers > 0)
return false;
if (isReader(callingThread))
return true;
if (writeRequests > 0)
return false;
return true;
}
private boolean isReader(Thread callingThread) {
return readingThreads.get(callingThread) != null;
}
private int getReadAccessCount(Thread callingThread) {
Integer count = readingThreads.get(callingThread);
return count == null ? 0 : count.intValue();
}
public synchronized void unlockRead() {
Thread callingThread = Thread.currentThread();
int accessedCount = getReadAccessCount(callingThread);
if (accessedCount != 0) {
if (accessedCount == 1) {
readingThreads.remove(callingThread);
notifyAll();
} else {
readingThreads.put(callingThread, (accessedCount - 1));
}
}
}
}
代码中我们可以看到,只有在没有线程拥有写锁的情况下才允许读锁的重入。此外,重入的读锁比写锁优先级高。
写锁重入
仅当一个线程已经持有写锁,才允许写锁重入(再次获得写锁)。下面是方法lockWrite和unlockWrite修改后的的代码:
package lihao.thread;
import java.util.HashMap;
import java.util.Map;
public class MyReadWriteLock {
private Map<Thread, Integer> readingThreads = new HashMap<>();
private int writeAccess = 0;
private int writeRequests = 0;
private Thread writingThread = null;
public synchronized void lockRead() throws InterruptedException {
Thread callingThread = Thread.currentThread();
while (canGrantReadAccess(callingThread)) {
wait();
}
readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
}
private boolean canGrantReadAccess(Thread callingThread) {
if (writeAccess > 0)
return false;
if (isReader(callingThread))
return true;
if (writeRequests > 0)
return false;
return true;
}
private boolean isReader(Thread callingThread) {
return readingThreads.get(callingThread) != null;
}
private int getReadAccessCount(Thread callingThread) {
Integer count = readingThreads.get(callingThread);
return count == null ? 0 : count.intValue();
}
public synchronized void unlockRead() {
Thread callingThread = Thread.currentThread();
int accessedCount = getReadAccessCount(callingThread);
if (accessedCount != 0) {
if (accessedCount == 1) {
readingThreads.remove(callingThread);
} else {
readingThreads.put(callingThread, (accessedCount - 1));
}
notifyAll();
}
}
public synchronized void lockWrite() throws InterruptedException{
writeRequests++;
Thread callingThread = Thread.currentThread();
while(!canGrantWriteAccess(callingThread)){
wait();
}
writeRequests--;
writeAccess++;
writingThread = callingThread;
}
private boolean canGrantWriteAccess(Thread callingThread) {
if (haveReaders())
return false;
if (writingThread == null)
return true;
if (!isWriter(callingThread))
return false;
return true;
}
private boolean haveReaders() {
return readingThreads.size() > 0;
}
private boolean isWriter(Thread callingThread) {
return writingThread == callingThread;
}
public synchronized void unlockWrite() throws InterruptedException{
if (writeAccess > 0) {
writeAccess--;
if (writeAccess == 0) {
writingThread = null;
}
notifyAll();
}
}
}