排他锁:同一时刻只允许一个线程访问例如synchronized。
共享锁:同一时刻可以允许多个读线程进行访问。
读写锁:中有既有排他锁也有共享锁,读读操作是共享锁,读写操作和写写操作是排他锁。
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Demo {
private Map<String, Object> map = new HashMap<>();
private ReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock r = rwl.readLock();
private Lock w = rwl.writeLock();
public Object get(String key) {
r.lock();
System.out.println(Thread.currentThread().getName() + " 读操作在执行..");
try {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return map.get(key);
} finally {
r.unlock();
System.out.println(Thread.currentThread().getName() + " 读操执行完毕..");
}
}
public void put(String key, Object value) {
w.lock();
System.out.println(Thread.currentThread().getName() + " 写操作在执行..");
try {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
} finally {
w.unlock();
System.out.println(Thread.currentThread().getName() + " 写操作执行完毕..");
}
}
}
测试写操作,打印结果:等她其他写操作完成后继续写操作
public class Main {
public static void main(String[] args) {
Demo d = new Demo();
new Thread(new Runnable() {
@Override
public void run() {
d.put("key1", "value1");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
d.put("key2","value2");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
d.put("key3", "value3");
}
}).start();
}
}
测试读写操作,打印结果:写操作完成再读
public class Main {
public static void main(String[] args) {
Demo d = new Demo();
new Thread(new Runnable() {
@Override
public void run() {
d.put("key1", "value1");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(d.get("key1"));
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(d.get("key1"));
}
}).start();
}
}
测试读操作,打印结果:很快打印输出没有停顿
public class Main {
public static void main(String[] args) {
Demo d = new Demo();
d.put("key1", "value1");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(d.get("key1"));
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(d.get("key1"));
}
}).start();
}
}
ReentrantReadWriteLock源码解读
ReentrantReadWriteLock中包含了两种锁,读锁ReadLock和写锁WriteLock,可以通过这两种锁实现线程间的同步。ReadLock和WriteLock实现了Lock接口,读锁和写锁中的lock方法和unlock方法都委托给了同步器Sync,Sync继承自AbstractQueuedSynchronizer,ReentrantReadWriteLock还包含公平锁和非公平锁的实现。
我们先来分析一下Sync:
读写锁需要用state保存的以下状态,用来处理读写业务:
写锁重入的次数,用来处理写锁重入逻辑,如果不是读写锁state只需要记录重入次数即可;
读锁的个数,只有读锁个数为0,写锁才能进入;
每个读锁重入的次数,用来处理读锁重入逻辑;
AQS 的状态state是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 一般不会同时不为 0,只有当线程占用了写锁,该线程可以重入获取读锁,反之不成立。
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
// 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 写锁的可重入的最大次数、读锁允许的最大数量65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁的掩码,用于状态的低16位有效值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读锁计数,当前持有读锁的线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁的计数,也就是它的重入次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
AQS定义了独占模式的acquire()和release()方法,共享模式的acquireShared()和releaseShared()方法.还定义了抽象方法tryAcquire()、tryAcquiredShared()、tryRelease()和tryReleaseShared()由子类实现
写锁的获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {//并非第一次进入,重入
// 1.写锁为0,读锁不为0 或者写锁不为0,且当前线程不是已获取独占锁的线程,锁获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//2. 写锁数量已达到最大值,写锁获取失败
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);//重入成功
return true;
}
//3.当前线程应该阻塞,或者设置同步状态state失败,获取锁失败。
//线程第一次进入
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的试放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//判读是否是独占锁
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;//判断是否还有写锁
if (free)
setExclusiveOwnerThread(null);
setState(nextc);//释放并记录状态
return free;
}
读锁的获取
除了有写线程再写操作不能获取线程成功,其他状态都可以获取成功。
public final void acquireShared(int arg){
if(tryAcquireShared(arg) < 0){ // 1. 调用子类, 获取共享 lock 返回 < 0, 表示失败
doAcquireShared(arg); // 2. 调用 doAcquireShared 当前 线程加入 Sync Queue 里面, 等待获取 lock
}
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; //1.有线程持有写锁,且该线程不是当前线程,获取锁失败
int r = sharedCount(c); //2.获取读锁计数
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//3.如果不应该阻塞,且读锁数<MAX_COUNT且设置同步状态state成功,获取锁成功。
if (r == 0) { //下面对firstReader的处理:firstReader是不会放到readHolds里的,这样,在读锁只有一个的情况下,就避免了查找readHolds。
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//锁重入
firstReaderHoldCount++;
} else {
// // 非 firstReader,有其他线程进入,记录其他线程进入次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//4.获取读锁失败,放到循环里重试。
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1; //1.有线程持有写锁,且该线程不是当前线程,获取锁失败
//2.有线程持有写锁,且该线程是当前线程,则应该放行让其重入获取锁,否则会造成死锁。
} else if (readerShouldBlock()) {
//3.写锁空闲 且 公平策略决定 读线程应当被阻塞
// 下面的处理是说,如果是已获取读锁的线程重入读锁时,
// 即使公平策略指示应当阻塞也不会阻塞。
// 否则,这也会导致死锁的。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
//4.需要阻塞且是非重入(还未获取读锁的),获取失败。
if (rh.count == 0)
return -1;
}
}
//5.写锁空闲 且 公平策略决定线程可以获取读锁
if (sharedCount(c) == MAX_COUNT)//6.读锁数量达到最多
throw new Error("Maximum lock count exceeded");
//7. 申请读锁成功,下面的处理跟tryAcquireShared是类似的。
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 清理firstReader缓存 或 readHolds里的重入计数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 完全释放读锁
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; // 主要用于重入退出
}
// 循环在CAS更新状态值,主要是把读锁数量减 1
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 释放读锁对其他读线程没有任何影响,
// 但可以允许等待的写线程继续,如果读锁、写锁都空闲。
return nextc == 0;
}
}
锁降级
锁降级是指写锁降级为读锁。
在写锁没有释放的时候,获取到读锁,再释放写锁
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.imageio.spi.IIOServiceProvider;
public class Demo {
private Map<String, Object> map = new HashMap<>();
private ReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock r = rwl.readLock();
private Lock w = rwl.writeLock();
private volatile boolean isUpdate;
public void readWrite() {
r.lock(); // 为了保证isUpdate能够拿到最新的值
if (isUpdate) {
r.unlock();
w.lock();
map.put("xxx", "xxx");
r.lock();//写锁降级为读锁
w.unlock();
}
Object obj = map.get("xxx");
System.out.println(obj);
r.unlock();//读锁释放保障上面的读取正常
}
public Object get(String key) {
r.lock();
System.out.println(Thread.currentThread().getName() + " 读操作在执行..");
try {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return map.get(key);
} finally {
r.unlock();
System.out.println(Thread.currentThread().getName() + " 读操执行完毕..");
}
}
public void put(String key, Object value) {
w.lock();
System.out.println(Thread.currentThread().getName() + " 写操作在执行..");
try {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
} finally {
w.unlock();
System.out.println(Thread.currentThread().getName() + " 写操作执行完毕..");
}
}
}
锁升级
把读锁升级为写锁
在读锁没有释放的时候,获取到写锁,再释放读锁,由于锁的互斥,此处可忽略