一、读写锁简介
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁
类图如下:
说明:如上图所示Sync为ReentrantReadWriteLock内部类,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock实现了Lock接口、WriteLock也实现了Lock接口;
AQS定义了独占模式的acquire()和release()方法,共享模式的acquireShared()和releaseShared()方法.还定义了抽象方法tryAcquire()、tryAcquiredShared()、tryRelease()和tryReleaseShared()由子类实现,tryAcquire()和tryAcquiredShared()分别对应独占模式和共享模式下的锁的尝试获取,就是通过这两个方法来实现公平性和非公平性,在尝试获取中,如果新来的线程必须先入队才能获取锁就是公平的,否则就是非公平的。这里可以看出AQS定义整体的同步器框架,具体实现放手交由子类实现。
通过类图我们知道一些核心操作由Sync类实现
Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用;
Sync源码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
// 读锁单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//锁持有的最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//排它锁持有的最大数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回count中表示的共享持有的数量 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回count中表示的独占持有的数量 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 计数器
static final class HoldCounter {
// 计数
int count = 0;
// Use id, not reference, to avoid garbage retention
// 获取当前线程的TID属性的值
final long tid = getThreadId(Thread.currentThread());
}
// 本地线程计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 本地线程计数器
private transient ThreadLocalHoldCounter readHolds;
// 缓存的计数器
private transient HoldCounter cachedHoldCounter;
/记录第一个持有共享锁线程的持有共享锁的数量,作者认为大多数情况下不会有并发,更多的是线程交替持有锁
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
//构造器
Sync() {
// 本地线程计数器
readHolds = new ThreadLocalHoldCounter();
// 设置AQS的状态
setState(getState()); // ensures visibility of readHolds
}
}
直接上源码
一.写锁过程
获取写锁:
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
//ReentrantReadWriteLock 內部类Sync 继承自AQS,这里是调用aqs中的acquire方法
public void lock() {
sync.acquire(1);
}
//AQS中定义了tryAcquire抽象方法,具体的实现由子类去实现
//这里除tryAcquire方法和Reentrantlock 略有不同,后续操作一样一样的,
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
说明:除了aqs中的tryAcquire由具体的实现类来实现,其他部分和ReetrantLock获取锁的过程一样的,这里就不絮叨了,下边主要看下tryAcquire方法的具体实现。(可以参考我写的这篇ReentrantLock详解,或者不清楚的直接留言我)
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//获取当前锁对象状态
int c = getState();
// 返回count中表示排它锁的数量
int w = exclusiveCount(c);
//说明锁被占有(共享锁或者排它锁)
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//说明现在有共享锁被别的线程占有,尝试获取锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//说明当前线程持有排它锁或者共享锁,这里是判断有没有超出重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 排它锁重入,直接获取锁
setState(c + acquires);
return true;
}
//如果当前为非公平锁: writerShouldBlock 方法直接返回 false,然后去争抢锁
/**如果当前为公平的写锁 writerShouldBlock 该方法调动 AQS的hasQueuedPredecessors 方法,
判断当前同步队列有没有等待的线程,如果有返回true,没有等待的线程在返回false 然后去争抢锁**/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//成功获取锁,把当钱锁设置为当前线程占有
setExclusiveOwnerThread(current);
return true;
}
这里获取锁失败的情况主要有
- 锁被其他线程占有(共享锁或者排它锁)
- 如果为公平锁,等待队列上有线程等待
- 超出锁重入最大数量
- 别的线程争抢了锁
失败后的具体操作见ReentrantLock详解
写锁的释放
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.unlock();
public void unlock() {
sync.release(1);
}
//AQS中定义了tryAcquire抽象方法,具体的实现由子类去实现
public final boolean release(int arg) {
//tryRelease尝试释放锁(锁status-arg),如果当前线程没有占有的锁(锁status=0) 返回true
if (tryRelease(arg)) {
//当前线程释放掉了所有锁
Node h = head;
//如果等待队列第一个结点有挂起的线程,将它唤醒去争抢
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
protected final boolean tryRelease(int releases) {
//判断当前线程是否为持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//判断是否已经全部释放写锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁的获取和释放
读锁获取过程:
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
readLock.lock();
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
主要来看下aqs定义的抽象方法tryAcquireShared (sync具体实现的)
获取读锁失败的情况有 :
(1)有其他线程持有排它锁,获取锁失败。
(2)公平锁:同步队列有等待节点;非公平锁:同步队列头节点为排它锁同步队列(防止写锁饥饿)
(3)读锁数量达到最多,抛出异常。
除了以上三种情况,该线程会循环尝试获取读锁直到成功。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果当前排他锁被占有,判断是不是当前线程占有的
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//共享锁占有的数量
int r = sharedCount(c);
//readerShouldBlock 方法 判断同步队列中第一个节点是 什么状态
//如果是公平锁:同步队列有节点就返回true,有可能是共享锁也有可能是排它锁的节点,
//如果是非公平锁:同步队列第一个节点是等待排它锁 就返回true,防止排它锁出现饥饿状态
//readerShouldBlock 为false就直接获取锁
if (!readerShouldBlock() &&
//c +的是 1<<16,读锁为高16位表示
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中
if (r == 0) {
// 设置第一个读线程
firstReader = current;
// 读线程占用的资源数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {// 当前线程为第一个读线程,表示第一个读锁线程重入
// 占用资源数加1
firstReaderHoldCount++;
} else {
// 获取计数器
//如果共享锁是被第2+n个线程占有,则使用threadlocal 记录每个线程持有的线程数量
HoldCounter rh = cachedHoldCounter;
// 计数器为空或者计数器的tid不为当前正在运行的线程的tid
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程对应的计数器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 计数为0
//加入到readHolds中
readHolds.set(rh);
//计数+1
rh.count++;
}
return 1;
}
//获取锁失败,放到循环里重试
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
//有线程持有写锁,且该线程不是当前线程,获取锁失败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
else{
//有线程持有写锁,且该线程是当前线程,则应该放行让其重入获取锁,否则会造成死锁
}
//没有线程持有排它锁,判断获取共享锁是否应该被阻塞
//readerShouldBlock 方法 判断同步队列中第一个节点是 什么状态
//如果是公平锁:同步队列有节点就返回true,有可能是共享锁也有可能是排它锁的节点,
//如果是非公平锁:同步队列第一个节点是等待排它锁 就返回true,防止排它锁出现饥饿状态
//readerShouldBlock 为false就直接获取锁
//注:如果为读锁重入的话是允许获取读锁的,该情况会引起写锁饥饿
} else if (readerShouldBlock()) {
// 确保获取的不是 读重入锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
//如果当前锁不是 读读重入,且应该阻塞,那么获取锁失败
if (rh.count == 0)
return -1;
}
}
//判断当前线程有没有超过最大数量限制
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//再次尝试获取锁~~(可能为写读重入)
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁获取失败,调用aqs的doAcquireShared方法尝试将当前线程任务节点加入到同步队列中(加入同步队列的具体细节见ReentrantLock详解)
private void doAcquireShared(int arg) {
// 将当前线程任务添加到同步队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前继节点
final Node p = node.predecessor();
// 判断前继节点是否是head节点
if (p == head) {
//如果前置节点为head说明,他当前线程是等待队列中的第一个,那么就尝试获取锁(这里可能是避免线程的上下文切换)
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取 lock 成功, 设置新的 head, 并唤醒后继获取 readLock 的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 该线程有可能是被中断唤醒,也有可能是被其他线程唤醒,这里设置下中断状态
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//普通锁的情况下:,然后返回false继续自旋 尝试获取锁
//shouldParkAfterFailedAcquire 只有发现当前节点不是首节点才会返回true ,然后挂起当前线程,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果该线程是被中断唤醒的,用于辅助后续操作判断当前线程是被中断唤醒的
interrupted = true;
}
} finally {
//如果该方法因为某些特殊情况意外的退出(没有获取锁就退出了),那么就取消尝试获取锁
if (failed)
cancelAcquire(node);
}
}
如果读锁获取失败后,尝试将当前线程节点加入到同步队列中。
如果该节点为头节点,那么就自旋争抢锁(避免上下文切换),获取锁成功的话,调用setHeadAndPropagate方法继续唤醒后续节点(如果后续节点为读锁等待节点的话);
如果该节点不为头节点,将当前线程挂起;
我们来看下setHeadAndPropagate方法
// 如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,或在节点的下个节点也在申请读锁,
//则在CLH队列中传播下去唤醒线程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//下面具体分析
doReleaseShared();
}
}
注:后续读锁的释放操作也调用的这个doReleaseShared 方法
private void doReleaseShared() {
for (;;) {
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟releaseShared两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE
(这里不是很明白,为什么不需要唤醒的节点要设置这个状态,哪个老铁知道为什么的话指点下)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
怎么理解这个传播呢:
就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。
读锁的释放过程
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
readLock.unlock();
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//上文有讲
doReleaseShared();
return true;
}
return false;
}
//该方法的主要作用就是用来维护下当前线程读锁的重入数量;
//如果没有线程占有读锁,就返回true 唤醒后续节点
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//保证一定能释放掉读锁的占有
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
读锁的释放过程比较简单,这里就不做过多的解释了
这里思考一个问题,获取读锁的时候我们讲到读锁传播的概念,为什么在读锁释放的时候,如果还有别的线程占有读锁就不用传播了呢?
因为在现在获取读锁的时候 已经完成读线程唤醒的传播了~~
总结:
获取写锁:获取写锁的过程总体和ReentrantLock详解流程一样;
- 尝试获取写锁(如果没有线程占有锁直接获取成功,并把当前锁设为独占)
-
公平锁:先判断同步队列中是否有等待节点在等待获取锁
-
非公平锁:上来就直接争抢锁
-
- 如果有其他现在占有锁(读锁或者写锁),获取失败,加入到同步队列中
- 判断当前节点是否为头节点,
- 是:自旋争抢锁(避免上下文切换)
- 否:调用LockSupport.park 方法挂起当前线程,等待被唤醒(别的线程调用LockSupport.uppark 或者被中断 或者 超时)
写锁的释放:
- 判断当前锁是否为释放锁的线程占有
- 设置当前锁状态,如果所有重入锁都释放掉了,设置当前锁独占为null,
- 唤醒同步队列中的头节点
注:写锁的释放并没有进行读锁的释放传播,读锁的传播是有读锁成功获取读锁以后进行的
读锁的获取:
- 判断写锁是否被占用(是:判断是否为写读重入锁)
- 是:判断是否为写读重入锁
- 否:获取失败,加入同步队列中
- 是:判断是否为写读重入锁
- 判断当前读线程是否应该被阻塞
- 公平锁:如果同步队列中有等待节点就获取锁失败,把当前读线程节点加入到同步队列
- 非公平锁:如果同步队列中的第一个节点为写锁的等待节点获取锁失败,把当前读线程节点加入到同步队列(防止写锁饥饿)
- 获取读锁成功,维护线程占有读锁的数量,判断当前节点是否为第一个获取读锁的线程:
- 否:把当前线程占有读锁数量维护进入HoldCounter(继承自ThreadLocal,为每个线程都维护了一个读锁重入的计数)
- 是:直接通过变量firstReader,firstReaderHoldCount维护当前线程占有读锁的数量(这里作者应该是认为大多数情况下锁的获取为交替获取,没必要直接就用线程计数器来为每个线程为一个数量)
- 注:如果后续节点为读锁节点,就唤醒(该行为会传播)
- 获取锁失败:加入到同步队列中,判断当前节点是否头节点
- 是:自旋争抢锁
- 否:挂起当前线程,等待被唤醒
读锁的释放:
- 设置当前线程占有的锁数量-1
- 唤醒后续节点~~~