首先简单介绍几个概念
-
重量级锁:用户起了几个线程,经过os调度,然后在交给java虚拟机执行。重量级锁是操作os函数来解决线程同步问题的,涉及到了内核态与用户态之间的切换,这个开销是很大的,因此被称为重量级锁。
-
轻量级锁:由于重量级锁对os函数的频繁操作十分耗时,因此衍伸出来了轻量级锁,目的就是为了减少对内核的直接操作,减少一些可以避免的开销。而轻量级锁来解决线程同步问题一般都只涉及到jdk层面,且我们电脑执行代码是很快的。
-
偏向锁:只要有人过来竞争,偏向锁就会升级。偏向锁的意义在于,在只有一个线程运行或者无竞争的情况下,减少轻量级锁带来的开销。
-
可重入锁:同一个线程内多次获取同一把锁,进行lock操作而不会出现死锁的情况称为锁的可重入性
-
公平锁:进行加锁前会进行判断看自己是否需要排队,即使自己是第一个进行lock的线程,遵循先来后到的原则
-
非公平锁:没有队列的判断逻辑,谁先执行cas,谁就加锁成功,谁先抢到就是谁的
-
自旋锁:一个线程在获取锁的时候,另外一个线程已经抢占了锁,那么此线程将一直陷入循环等待的状态,然后一直判断是否能获取锁成功,直到获取锁成功,退出循环
当然本文着重介绍ReentrantLock是怎么实现的。阅读本文可以收获如下知识
- 什么是可重入锁?
- 同一个线程内多次获取同一把锁,进行lock操作而不会出现死锁的情况称为锁的可重入性
- ReentrantLock是一把什么类型的锁?哪里可以体现?
- ReentrantLock是一把轻量级锁、可重入锁。可重入锁体现在同一个线程可以多次对同一把锁的lock、unlock操作而不会造成死锁的情况出现
- AQS是什么,AQS与ReentrantLock有什么关系?AQS核心是什么?
- sync就是个AQS,AQS全称AbstractQueuedSynchronizer,ReentrantLock的加锁即sync.lock
- AQS核心:park、自旋、cas
- 并发、并行,它们有啥差别?
- 并发:并发不一定存在竞争,指同一个时间段内,线程数量
- 并行:存在竞争,在同一片刻,竞争同一个资源
- 知道什么是公平锁、非公平锁吗?
- 公平锁:在ReentrantLock中有一个队列来维护排队关系,即使锁被释放了,即使自己是队列排队的第一个,依然会进行判断自己是否有获取锁的资格。即遵循先来后到的规则
- 非公平锁:对比公平锁是把队列部分给剔除了,谁先抢到锁谁就进行cas加锁成功
- 讲讲你对ReentrantLock的理解
- 在jdk1.6前:Synchronized是通过操作os函数来实现线程间的同步问题的是一把重量级锁
- jdk1.6之后ReentrantLock对比Synchronized都差不多,Synchronized底层做了优化,有一个锁升级的过程,然后就是ReentrantLock的调用方法更加丰富一点
- ReentrantLock是怎么实现的,有了解过吗?
- ReentrantLock主要利用AQS实现的,而AQS核心又是park、自旋、cas
- ReentrantLock加锁的大概流程是怎么样的?
- 先尝试去获取锁(先看是否需要排队,不需要排队则cas加锁。如果是同一个线程来操作,重入锁状态标识符++),获取不到锁把当前thread封装成一个Node节点放入队列中,维护好队列关系后,如果发现自己的排队的第一个人,那么最多还会去尝试获取锁2次,实在获取不到锁了,让当前Node睡眠,最后执行finally中的方法去取消当前线程的竞争
- ReentrantLock中的队列什么情况下会被初始化?
- 至少存在俩个 线程竞争的情况下才会被初始化
ReentrantLock定义
首先ReentrantLock是一把可重入锁、轻量级锁,至于是公平锁还是非公平锁,看我们怎么把它实例化出来的,默认情况下是一把非公平锁,当我们创建实例的时候,传入参数true,此时就是一把公平锁。
锁的可重入性
可重入锁示例代码体现如下,同个线程俩次获取同一把锁并未出现死锁的情况。
new Thread(() -> {
int i = 0;
lock.lock();
System.out.println("初始化锁:" + lock);
while (true) {
lock.lock();
System.out.println("第" + ++i + "次拿到锁" + lock);
if (i == 100) {
break;
}
lock.unlock();
}
lock.unlock();
System.out.println(i);
}).start();
AQS定义
public void lock() {
sync.lock();
}
什么是AQS?sync就是个AQS,AQS全称AbstractQueuedSynchronizer,ReentrantLock的加锁就与AQS有关!
abstract static class Sync extends AbstractQueuedSynchronizer
公平锁加锁-acquire
- tryAcquire:尝试加锁
- addWaiter:加锁失败,放入等待队列
- acquireQueued:尝试获取锁、修改前一个已经睡眠节点的状态为-1、当前线程park(内容有点多,下文详解)
final void lock() {
acquire(1);
}
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
探究尝试加锁机制-tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取锁的状态值,默认为0
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//锁的重入性体现
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- tryAcquire return:true 加锁成功 、return:false 加锁失败
- hasQueuedPredecessors:检查队列中是否有排队的线程
- setExclusiveOwnerThread:设置锁的持有者线程
ReentrantLock的可重入性体现之处:
如果是同一个线程进行lock操作,记录加锁的次数(state = state+1)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
公平锁体现之处:在加锁之前会进行判断队列中是否有无排队的(hasQueuedPredecessors),如果有则加入队列排队。无则cas进行加锁。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
ReentrantLock队列探究
队列就是由Node节点组成的双向链表,Node节点主要构造如下
static final class Node {
//指针:指向下一个节点
volatile Node next;
//每个Node节点中对应相关的线程
volatile Thread thread;
//指针:指向上一个节点
volatile Node prev;
//park标识符:默认是0、park是-1
volatile int waitStatus;
}
acquireQueued
加锁失败(tryAcquire(arg)==false)必然进入队列,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- 如果是单线程进行lock操作,那么这个队列永远和AQS无关。
- 单线程下的lock操作,队列并不会被初始化
@ReservedStackAccess
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter
- addWaiter:入队操作,但是此时的线程并没有休眠中断,仅仅是把线程封装成一个Node节点入队而已。
- 参数mode:当前参与竞争的线程Node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 入队情景演示(队列不为null的情况):维护链表关系
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq
- 调用时机:队列为null的情况
- 入队情景演示:t1线程持有锁,但是并未释放锁,t2线程参与竞争锁,tryAcquire尝试获取锁失败,添加进队列。pred == null 执行enq(node);
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//和情景一一样的逻辑,维护链表关系
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 队列的第一个Node节点的prev、thread属性永远为null(但是head!=null)
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
acquireQueued
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前一个Node节点
final Node p = node.predecessor();
//只有是第二个节点才有资格再次尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//失败获取锁后是否需要park判 shouldParkAfterFailedAcquire
//park(睡眠)线程并检查线程是否被打断(parkAndCheckInterrupt)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 只有是第一个排队的节点,才赋予再次尝试获取锁的机会(tryAcquire(arg)),最多有俩次tryAcquire(arg)机会,如果此时获取到了,维护链表关系
if (p == head && tryAcquire(arg)) {
setHead(node);
//消除引用帮助GC,回收队列头
p.next = null; // help GC
failed = false;
return interrupted;
}
- 无论是初始化队列的时候还是维护链表关系的时候,队列头head的thread、prev属性一直为null(但是head!=null)
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire version 1.0
shouldParkAfterFailedAcquire:修改当前排队Node节点的上一个节点的状态<==>修改park线程的状态 (waitStatus = -1)
- return false : 接着循环
- return true:当前线程park休眠
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//waitStatus默认是0
int ws = pred.waitStatus;
//Node.SIGNAL默认是-1
if (ws == Node.SIGNAL)
return true;
//暂时还不清楚
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//修改上一个节点的状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
park线程,使线程休眠
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
----------------到此加锁过程over-------------------------------------------
解锁
public void unlock() {
sync.release(1);
}
我个人感觉h != null这个条件有点多余
@ReservedStackAccess
public final boolean release(int arg) {
//先尝试去释放锁,消除重入次数
if (tryRelease(arg)) {
Node h = head;
//一般来说head存在且waitStatus=-1
if (h != null && h.waitStatus != 0)
//唤醒队列中从左往右第一个睡眠线程
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease尝试释放锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
//每unlock一次c--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//直到c==0才会释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//重新设置状态
setState(c);
return free;
}
从源码角度来看,虽然我们多个lock配套一个unlock程序也能正常运行,毕竟release的返回值对unlock方法没有半毛钱的关系,而这只是消除重入锁的次数,一个lock配套一个unlock才是真正的解锁。只有unlock与lock对应上了,c才会==0,才会真正消除上锁时的所有引用。
//直到c==0才会释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
unparkSuccessor
方法做了俩件事情
- 修改睡眠线程的waitStatus为0
- 唤醒睡眠队列中从左到右第一个睡眠的线程
//参数node:指head
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//尾节点开始寻找队列中从左往右第一个睡眠的线程,赋值给s
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒s线程
if (s != null)
LockSupport.unpark(s.thread);
}
还记得我们线程是在哪里睡眠的吗?我再来贴一遍代码,就是在parkAndCheckInterrupt这休眠的。而这是个死循环,那么次线程就接着执行死循环咯。
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//线程如果被park那么会执行finally中的语句块
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//取消竞争
cancelAcquire(node);
}
}
接着循环必然会走tryAcquire,由于此时锁已经被释放,那么这个线程就能获取到锁了,那么这个线程对应的代码块也能被正常执行了,这样就保证了,线程间的同步问题。到此解锁过程over
解锁重难点扣细节
- 为什么要尾节点扫描
//参数node:指head
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//尾节点开始寻找队列中从左往右第一个睡眠的线程,赋值给s
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒s线程
if (s != null)
LockSupport.unpark(s.thread);
}
看上面代码的时候有疑惑吗?唤醒第一个睡眠的线程为啥还要,从尾节点开始遍历寻找队列中从左往右第一个睡眠的线程?Node s = node.next;这句代码不就是队列中从左往右第一个睡眠的线程吗?但是在并发情况下就不是这样子了。我们在addWaiter维护链表关系的代码如下。
Node t = tail;
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
假设线程1持有锁,但是并未释放锁。线程2、3、4、5并发执行到addWaiter维护链表关系那段代码,但是都未执行维护后向链表那句代码,然后此时的队列图解如下。此时线程6执行unparkSuccessor方法尾节点扫描,这一小段间隙,线程2、3、4、5执行addWaiter方法完毕,且执行完了shouldParkAfterFailedAcquire(p, node)、parkAndCheckInterrupt()方法,此时的队列才是完完整整的了,由于线程6unparkSuccessor的时候,队列如下图,如果从头节点遍历队列,会导致根本就拿取不到park节点,如果尾节点遍历就不会出现这个问题,最后线程6唤醒park线程。