看看ReentrantReadWriteLock怎么实现readLock方法的
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
readerLock是一个成员变量,在构造方法里被初始化
private final ReentrantReadWriteLock.ReadLock readerLock;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//ReadLock是一个内部类
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
从lock方法入手
public void lock() {
sync.acquireShared(1);//请求资源
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);//获取失败就入队列
}
tryAcquireShared也是由子类的sync具体实现,看ReentrantReadWriteLock的内部类sync如何实现
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();//获取资源数
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;//其他线程已经拿到写锁了
int r = sharedCount(c);//拿到已分配的共享资源数
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//到这里说明获得资源成功
if (r == 0) {//第一次调用获取读锁
firstReader = current;//记录第一个获得读锁的线程
firstReaderHoldCount = 1;//记录持有的资源数
} else if (firstReader == current) {
firstReaderHoldCount++; //更新资源数
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;//更新资源数
}
return 1;
}
//上面readerShouldBlock返回false,或者资源数达到MAX_COUNT以上,或者CAS操作失败进入下面方法
return fullTryAcquireShared(current);
}
上面出现了之前文章提到的许多变量、方法ReentrantReadWriteLock笔记(二) – Sync源码分析,
上面有个readerShouldBlock()方法,是个抽象方法,先分析非公平锁(NonfairSync)的实现
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
//从源码注释看,这个方法是防止请求写锁的线程永远失败
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
final boolean isShared() {
return nextWaiter == SHARED;
}
从上面看到,当sync队列不为空,且第二个节点不是SHARED状态会返回true,回顾一下if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)),所以意思是如果sync队列的第二个节点不是请求共享锁就会返回true,短路掉,进入 fullTryAcquireShared(current)方法。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();//拿到资源数
if (exclusiveCount(c) != 0) {//有线程拿到排它资源(写锁)了
if (getExclusiveOwnerThread() != current)
return -1;//不是当前线程拿到排它资源(写锁)
} else if (readerShouldBlock()) {
//说明有请求写锁的线程排在队列的第二位
if (firstReader == current) {
//说明已经获得读锁了,而且还是第一个获得的
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();//个人理解,这移除为了防止线程取消,但不能被GC回收
}
}
if (rh.count == 0)//等于0说明是新建的readHold对象
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;//对当前线程持有的共享资源数+1
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
上面失败了,那就只能进入sync队列了,来看看doAcquireShared方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//添加一个状态为SHARED的节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
主要看一下setHeadAndPropagate(node, r)方法
private void setHeadAndPropagate(Node node, int propagate) {
//进这个方法说明线程获得共享资源(读锁),所以有必要唤醒一下后置节点
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//下面会分析
}
}
总结一下流程:
- 线程A请求读锁,先判断是否有其他线程申请了排它资源(写锁),有就失败
- 接着开始申请读锁,如果sync队列中的第二个节点是申请排它锁的话,除非线程A已经获得了读锁,否则就会请求失败,进入sync队列;其他情况获取成功。
- 进入sync队列,并开始休眠,等待自己排到队列的第二个节点,就会开始继续尝试tryAcquire,失败就继续休眠;成功就会获得资源,成为队列头节点,并根据条件是否要唤醒后置节点。
公平锁(FairSync)的实现差别在于readerShouldBlock方法
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
参考ReentrantReadWriteLock笔记(三) – WriteLock源码分析中的公平锁分析
来看看unlock方法
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();//释放共享锁(读锁),让请求写锁的线程有机会获得锁
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();//移除帮助GC回收
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {//考虑多线程释放,这里用死循环
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;//CAS成功,释放成功
}
}
//该方法保证头节点状态为PROPAGATE或取消
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {//若队列不为空,且不止一个节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;//设置待运行状态败,继续循环
unparkSuccessor(h);//唤醒后置线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; //设置状态为PROPAGATE
}
if (h == head)//过程中可能头节点已经改变
break;
}
}
总结一下流程
- 如果当前线程是firstReader,若firstReaderHoldCount是1,就把firstReader引用置为null,否则firstReaderHoldCount–
- 如果当前线程不是firstReader,就取出当前线程的HoldCounter,进行减1
- 接着是要把sycn的state的共享资源数减1,减完后,资源数变为了0就进入doReleaseShared方法
- doReleaseShared方法是在sync队列不止一个节点的情况下,保证头节点状态为PROPAGATE或取消
分析完后有个疑问:
【疑问】:为什么要弄个cachedHoldCounter变量?每次直接readHolds.get()不行吗?难道readHolds.get()很耗性能?