转载自https://blog.csdn.net/pcwl1206/article/details/85012834
目录
1、ReentrantReadWriteLock读写锁的概述:读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。
2、读写锁的具体实现【读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级】
2.1 读写状态的设计【需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,因此将其按位切割使用,高16位表示读,低16位表示写】
1)调用内部类WriteLock中的lock()方法:获取写锁。
2.2.2 读锁的获取:读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问时,读锁总是能够成功地被获取到,而所做的也只是增加读状态。
1. 如果其他线程已经获得了写锁,则当前线程获取读锁失败,返回-1,进入等待状态;
1、如果当前没有写锁或读锁时,第一个获取锁的线程都会成功,无论该锁是写锁还是读锁;
2、如果当前已经有了读锁,那么这时获取写锁将失败,获取读锁有可能成功也有可能失败【maybe公平锁进入队列,要等前面的唤醒,如果前面排队的是写锁,那么不会被唤醒】;
3、如果当前已经有了写锁,那么这时获取读锁或写锁,如果线程相同(可重入),那么成功;否则失败。
2.3 锁的释放【获取锁要做的是更改AQS的状态值以及将需要等待的线程放入到队列中。释放锁要做的就是更改AQS的状态值以及唤醒队列中的等待线程来继续获取锁。】
2)通过Sync调用AQS中的relaease方法【这个方法里面会调用tryRelease】:tryRelease释放失败返回false,释放成功(包括重入锁也要释放)要唤醒后面的线程
3)调用Sync中的tryRelease方法****:当且只当没有写锁的情况下返回true,还有写锁(因为可重入)则返回false。
1. 如果当前没有线程持有写锁,但是还要释放写锁,抛出异常;
2. 得到解除一把写锁后的状态,如果没有写锁了,那么将AQS的线程置为null;
3. 不管第二步中是否需要将AQS的线程置为null,AQS的状态总是要更新的。
3、ReentrantReadWriteLock中的其他方法
3.1 getOwner():用于返回当前获得写锁的线程,如果没有线程占有写锁,那么返回null
3.2 getReadLockCount():用于返回读锁的个数
3.3 getReadHoldCount():用于返回当前线程所持有的读锁的个数,如果当前线程没有持有读锁,则返回0。
3.4 getWriteHoldCount():返回当前线程所持有写锁的个数。
AQS有两种模式:共享模式和独占模式,读写锁的实现中,读锁使用共享模式;写锁使用独占模式;
另外一点需要记住的即使,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。
1、ReentrantReadWriteLock读写锁的概述:读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。
读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大的提升。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包中提供读写锁的实现是ReentrantReadWriteLock,它提供的特性如下所示:
1.1 ReentrantReadWriteLock的特性
- 1、获取顺序
非公平模式(默认):
当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。
公平模式 :
当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。
- 2、可重入
支持可重入:同一线程获取写锁之后能获取读锁,获取读锁之后不能获取写锁【因为读锁有多个,别的线程也在用】
- 3、锁降级
允许写锁降级为读锁【即遵循获取写锁、获取读锁再释放写锁的次序】 ,但不支持读锁升级为写锁。
- 4、中断锁的获取
在读锁和写锁的获取过程中支持中断。
- 5、支持Condition
写锁提供Condition实现。
- 6、监控
提供确定锁是否被持有等辅助方法
1.2 ReentrantReadWriteLock的结构
ReadLock 和 WriteLock 中的方法都是通过 Sync 这个类来实现的。Sync 是 AQS 的子类,然后再派生了公平模式和不公平模式。即一开始我们要定sync这个实例是公平还是非公平【FairSync或者NonfairSync只是在Sync下增加了writerShouldBlock和readerShouldBlock方法来进行判断,用于表示当有别的线程也在尝试获取锁时,是否应该阻塞来实现公平和非公平。
但是事实上在ReentrantReadWriteLock里锁的实现是靠内部java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的。这个类看起来比较眼熟,它是AQS的一个子类,这中类似的结构在CountDownLatch、ReentrantLock、Semaphore里面都存在。
在ReentrantReadWriteLock里面的锁主体就是一个Sync,也就是FairSync或者NonfairSync,所以说实际上只有一个
锁,只是在获取读取锁和写入锁的方式上不一样而已。
大家先仔细看看这张图中的信息。然后我们把 ReadLock 和 WriteLock 的代码提出来一起看,清晰一些:
从它们调用的 Sync 方法,我们可以看到:
ReadLock 使用了共享模式(aquireShared--releaseShared【AQS下的】,所以要实现tryAcquireShared),WriteLock 使用了独占模式(acquire---release【AQS下的】,所以要实现tryAcquire)。
对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。
对于非公平模式,writerShouldBlock直接返回false,说明不需要阻塞,我可以去抢;而readShouldBlock调用了apparentlyFirstQueuedIsExcluisve()方法。该方法在当前线程是写锁占用的线程时,返回true;否则返回false。也就说明,如果当前有一个写线程正在写,那么该读线程应该阻塞。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
等等,同一个 AQS 实例怎么可以同时使用共享模式和独占模式???
这里给大家回顾下 AQS,我们横向对比下 AQS 的共享模式和独占模式:
2、读写锁的具体实现【读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级】
2.1 读写状态的设计【需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,因此将其按位切割使用,高16位表示读,低16位表示写】
上图所示的当前同步状态表示一个线程已经获取了写锁,且重进入了2次,同时也连续获取了2次读锁。读锁和写锁通过位运算确定自己读和写的状态。
2.2 锁的获取
2.2.1 写锁的获取:写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读锁状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
1)调用内部类WriteLock中的lock()方法:获取写锁。
public static class WriteLock implements Lock, java.io.Serializable {
// ...
// 获取写锁
public void lock() {
sync.acquire(1); // 调用的是同步器AQS中的方法
}
// ...
}
2)调用AQS中的acquire方法(因为sync继承了AQS类,所有可以调用AQS中开放的方法):然后就是if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
3)着重点就是调用Sync中的tryAcquire方法【我们如果要写一个自定义锁,也是写一个Sync,然后根据是独占还是共享来重写tryAquire和tryAquireShared,然后再在实现的lock方法中根据独占还是共享返回sync.acquire(里面会调用tryAquire)或者sync.acquireShared(里面会调用tryAquireShared)】
tryAcquire获取写锁时有三步:
1. 如果当前有写锁或者读锁。如果只有读锁,返回false,因为这时如果可以写,那么读线程得到的数据就有可能错误;如果有写锁,但是线程不同,即不符合写锁重入规则,返回false ;---> 如果写锁的数量将会超过最大值65535,抛出异常;否则,写锁重入 ,成功获取锁,返回true;
3. 如果当前没有线程获得读锁或写锁。如果写线程应该被阻塞(writerShouldBlock()方法在公平模式下返回true,即你要进队列;在非公平模式下返回false,即只要你设置成功你就获得了锁,不需要进队列)或者CAS设置失败,返回false;否则将当前线程置为获得写锁的线程,返回true。
从上面可以看到调用了writerShouldBlock方法,FairSync的实现是如果等待队列中有等待线程,则返回false,说明公平模式下,只要队列中有线程在等待,那么后来的这个线程也是需要记入队列等待的;NonfairSync中的直接返回的直接是false,说明不需要阻塞。从上面的代码可以得出,当没有锁时,如果使用的非公平模式下的写锁的话,那么返回false,直接通过CAS就可以获得写锁。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// ...
// 静态内部类Sync,它的父类是AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
protected final boolean tryAcquire(int acquires) {
// 获取调用lock方法的当前线程
Thread current = Thread.currentThread();
// 获取当前线程的状态
int c = getState();
// 获取写锁的状态,写锁是排它锁
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果写锁的个数超过了最大值65535,抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写入重入锁,返回true
setState(c + acquires);
return true;
}
// 如果当前没有线程获得锁,如果写线程应该被阻塞或者CAS失败,返回false
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
// 否则将当前线程置为获得写锁的线程,返回true
setExclusiveOwnerThread(current);
return true;
}
// ...
}
}
2.2.2 读锁的获取:读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问时,读锁总是能够成功地被获取到,而所做的也只是增加读状态。
1)调用调用内部类ReadLock中的lock()方法
public static class ReadLock implements Lock, java.io.Serializable {
// ...
// 获取读锁
public void lock() {
sync.acquireShared(1); // 调用的是同步器AQS中的方法
}
// ...
}
2)调用AQS中共享模式:acquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到等待队列中。
Sync实现了tryAcquireShared方法,如下:
3)调用Sync中的tryAcquireShared方法
tryAcquireShared分为三步:
1. 如果其他线程已经获得了写锁,则当前线程获取读锁失败,返回-1,进入等待状态;
2. 否则,说明当前没有写线程或者本线程就是写线程(可重入),接下来判断是否应该读线程阻塞(取决于是公平锁还是非公平锁)并且读锁的个数是否小于最大值65535,并且CAS成功使读锁+1,成功,返回1。其余的操作主要是用于计数的【读状态是所有线程获取读锁次数的总和,通过getReadHoldCount()方法可以返回当前线程获取读锁的次数,每个线程各自获取读锁的次数是保存在ThreadLocal中的】;
3. 如果2中失败了,失败的原因有三,第一是应该读线程应该阻塞(公平锁),返回-1;第二是因为读锁达到了上线(因为只有16位来保存锁数量),抛出异常;第三是因为CAS失败,有其他线程在并发更新state,那么会调动fullTryAcquireShared方法,去循环更新锁。
fullTryAcquiredShared方法如下:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// ...
// 静态内部类Sync,它的父类是AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果当前有写线程并且本线程不是写线程,不符合重入,失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 得到读锁的个数
int r = sharedCount(c);
// 如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值,成功
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
// 如果当前读锁为0
if (r == 0) {
// 第一个读线程就是当前线程
firstReader = current;
firstReaderHoldCount = 1;
}
// 如果当前线程重入了,记录firstReaderHoldCount
else if (firstReader == current) {
firstReaderHoldCount++;
}
// 当前读线程和第一个读线程不同,记录每一个线程读的次数
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 否则,循环尝试
return fullTryAcquireShared(current);
}
// ...
}
}
2.2.3 总结下获取锁:
1、如果当前没有写锁或读锁时,第一个获取锁的线程都会成功,无论该锁是写锁还是读锁;
2、如果当前已经有了读锁,那么这时获取写锁将失败,获取读锁有可能成功也有可能失败【maybe公平锁进入队列,要等前面的唤醒,如果前面排队的是写锁,那么不会被唤醒】;
3、如果当前已经有了写锁,那么这时获取读锁或写锁,如果线程相同(可重入),那么成功;否则失败。
2.3 锁的释放【获取锁要做的是更改AQS的状态值以及将需要等待的线程放入到队列中。释放锁要做的就是更改AQS的状态值以及唤醒队列中的等待线程来继续获取锁。】
2.3.1 写锁的释放
1)调用WriteLock类中的unlock方法
public void unlock() {
sync.release(1);
}
2)通过Sync调用AQS中的relaease方法【这个方法里面会调用tryRelease】:tryRelease释放失败返回false,释放成功(包括重入锁也要释放)要唤醒后面的线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3)调用Sync中的tryRelease方法****:当且只当没有写锁的情况下返回true,还有写锁(因为可重入)则返回false。
写锁的释放主要有三步:
1. 如果当前没有线程持有写锁,但是还要释放写锁,抛出异常;
2. 得到解除一把写锁后的状态,如果没有写锁了,那么将AQS的线程置为null;
3. 不管第二步中是否需要将AQS的线程置为null,AQS的状态总是要更新的。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// ...
// 静态内部类Sync,它的父类是AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
protected final boolean tryRelease(int releases) {
// 如果没有线程持有写锁,但是仍要释放,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果没有写锁了,那么将AQS的线程置为null
if (free)
setExclusiveOwnerThread(null);
// 更新状态
setState(nextc);
return free;
}
// ...
}
}
2.3.2 读锁的释放
1)调用ReadLock中的unlock方法
public void unlock() {
sync.releaseShared(1);
}
2)通过Sync调用AQS的releaseShared方法【这个方法里会调用tryReleaseShared】:tryReleaseShared释放失败返回false,释放成功则调用doReleaseShared尝试唤醒下一个节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3):调用Sync的tryReleaseShared方法:通过CAS循环来线程安全的减少读状态(因为可能有多个读线程同时释放锁),减少值是1<<16。只有当读锁个数等于0才会返回true,否则返回false【虽然它减少了该线程的读锁个数,但多个线程仍然还有读锁存在,所以只会返回false,从而在releaseShared中不会唤醒后面的线程(后面阻塞的一定是写线程,因为如果一直都是读线程,每个读线程来都能获得锁,根本不会因阻塞放入同步队列,即使是公平锁,因为之前的线程都获得了读锁,所以同步队列里根本没有线程,所以也不用阻塞。所以我们必须要读锁个数为0才能唤醒需要写锁的线程)】
释放读锁对读线程没有影响,但是可能会使等待的写线程解除挂起开始运行。所以,一旦没有锁了,就返回true,否则false;返回true后,那么则需要释放等待队列中的线程,这时读线程和写线程都有可能再获得锁。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// ...
// 静态内部类Sync,它的父类是AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
protected final boolean tryReleaseShared(int unused) {
// 得到调用unlock的线程
Thread current = Thread.currentThread();
// 如果是第一个获得读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
// 否则,是HoldCounter中计数-1
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 死循环
for (;;) {
int c = getState();
// 释放一把读锁
int nextc = c - SHARED_UNIT;
// 如果CAS更新状态成功,返回读锁是否等于0;失败的话,则重试
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// ...
}
}