今天的主题是探究一下锁的获取过程是如何实现的,我们主要以ReentrantLock(重入锁)来展开研究,在进行探究之前我们先来了解几个比较重要的名词AQS,CAS
AQS(AbstractQueuedSynchronizer)在JDK文档中定义如下:
为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步而只追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。他的具体用法我们在后面再进行了解。
CAS(Compare And Swap):执行的函数CAS(v,e,n)他包含了三个参数
v: 表示要更新的对象
e: 表示预期值
n: 表示更新的新值
CAS也称之为无锁执行者,它有点类似与乐观锁,他会把v和e进行比较看是否一致,如果不一致那么肯定是有其他线程对对象进行了修改,他将会被告知操作失败,之后他可以选择重获获取值进行比较也可以直接放弃;反之将v替换成n,操作成功然后则可以执行后面的操作,这样虽然CAS没有牵扯到锁的概念但是他一样可以保证多线程情况下的资源共享的安全性;
1.锁获取的流程图
我们获取的锁的过程以公平锁为列,首先我们看看ReentrantLock的lock方法
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
他是直接交给Sync来处理,Sync是一个继承了AQS的同步器,作为ReentrantLock一个内部类,它主要的功能是调用AQS的功能方法实现锁的功能;
我们继续往下面走,会看到sync.lock()对于公平锁而言他使用的是FairSync即公平锁同步器的锁方法,具体代码如下:
final void lock() {
acquire(1);
}
接下来我们看看acquire的具体实现:
public final void acquire(int arg) {
/**
* tryAcquire() 尝试为当前线程去获取锁
*
* addWaiter() 如果前一步获取锁失败,那么将当前线程封装成Node实例添加等待队列的队尾、
*
* acquireQueued() 判断当前的节点的前一节点是否是头节点,如果不是阻塞线程,否则再次尝试获取锁
*
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
上 面方法主要的逻辑是:
1)首先去尝试获取到锁,这个过程如果获取成功将会更改锁的状态以及拥有着
2)获取锁失败后,需要将当前的线程封装成节点添加的等待队列的队尾
3)然后需要循环去判断是否新添加的节点的前驱节点是CANCELLED(对应的线程被取消),如果是我们将新添加的节点的前驱节点替换成非CANCELLED状态的节点;然后线程阻塞,等待被唤醒后再次竞争锁;
tryAcquire尝试获取锁的逻辑如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1)首先我们判断当前的融同步器的状态如果是0那么就是有机会获取到锁,同步器的状态你可以认为是当前线程的重复进入同步器的次数;
2)判断是否当前线程节点有前驱节点,如果没有那么当前的节点为等待队列的头节点即他可以尝试获取到锁,然后我们需要通过CAS无所执行者来判断是否当前同步器已经被其他线程使用了如果没有那么将设置当前 同步器的拥有者是当前的线程,获取锁成功;否则获取锁失败继续等待
3)如果同步器的状态不为0,那么同步器(锁)已经被使用了,那么就需要去判断此同步器的拥有者线程是否是当前线程,如果是那么直接获取锁成功,并且更改同步器的状态否则获取锁失败等待(这一步也是重入锁的实现原理,如果是当前线程进入此同步器那么直接直接获取到锁,而不用再次去使用CAS进行判断提高了性能)
addWaiter():如果尝试获取锁失败,那么需要将当前线程封装成节点保存到等待队列的队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
上面的实现步骤如下:
1)首先我们需要将当前线程封装成Node节点
2)获取队尾的节点,然后通过CAS来判断是否这边获取的尾节点已经被其他节点替换(即其他线程的节点已经加入到队尾)如果队尾的节点没有被替换那么成功经当前线程节点添加到队尾
3)如果上一步未成功,那么将调用enq方法初始化一个新的等待队列或者是重新执行一遍2的操作,这一步是一个循环调用的过程,知道有将当前的节点添加到队尾成功为止。
enq():主要是用来为同步器快速初始化一个等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上述的实现步骤如下:
1)如果没有队尾节点,那么可以判断出当前的等待队列并未初始化,那么将生成一个新的等待队列,并将头节点和为节点设置为同一节点;
2)如果不为空(addWaiter()中如果被其他线程已经提前加入到队尾),那么enq将重新执行添加到队尾的操作,前面的两个步骤是循环的执行,直到有结果;
acquireQueued:在上一步中我们将当前的线程已经成功的添加到队尾中,接下来需要将当前的线程阻塞直到当前的线程被唤醒再次去竞争同步器(锁)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上述实现步骤如下:咋一看如果不能获取到锁那么这个地方可能是一个死循环,但是实际上并不是的;
1)先获取到前驱节点,如果前驱节点是头节点,那么当前节点就可以提前去尝试去获取到锁,这个时候可能前驱节点并未释放同步器所以获取锁的操作失败,最终的结果就是当前的线程被挂起等待被唤醒
2)如果当前的线程的前驱节点并不是头节点,或者说是头节点但是尝试获取锁失败了那么需要判断前驱节点的状态是否要阻塞当前的线程
shouldParkAfterFailedAcquire:去除掉所有已经被取消的前置节点,将当前节点的前驱节点设置为非取消状态的节点,并返回是否需要将当前线程的阻塞的信号
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上述的实现步骤为:
1)判断当前节点的前驱节点是否是SIGNAL状态,如果是,代表当前节点是要进行锁的竞争,那么返回true后续将线程阻塞
2)如果ws大于0表示当前线程节点的前置节点线程已经被取消了,那么将会重新设置当前节点的前驱节点直到新的前驱节点不再是大于0即非取消状态
3)如果前驱节点即非SIGNAL状态也非CANCELLED(取消)状态,那么直接通过CAS设置前驱节点的状态为SIGNAL,然后最终的执行逻辑回到了 1),最终此方法的返回值肯定是true
parkAndCheckInterrupt:根据shouldParkAfterFailedAcquire返回的信号来判断是否阻塞当前的线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果能走到这个方法那么说明shouldParkAfterFailedAcquire返回的信息是需要阻塞当前的线程,这也就是为什么acquireQueued说的他不会应为为获取到锁而进入死循环的原因,这里会直接将线程给阻塞了,直到再次被唤醒;
总结,当前线程在为获取到锁之后执行了上面的逻辑如果还为获取到锁,那么他将进入阻塞状态等待这被唤醒,当被唤醒后又将继续上面的步骤直至获取到同步器(锁);
到这里一个获取锁的流程就已经结束了,如果写的有问题欢迎指正;