文章目录
Lock与synchronized比较
锁是用来控制多个线程访问共享资源的方式。JDK5之前,java程序主要是靠synchronized关键字实现锁功能的,而JDK5之后,并发包中增加了lock接口,它提供了与synchronized一样的锁功能。但是这两个锁与什么不同呢?
1.Lock没有像synchronized关键字隐式加锁解锁的便捷性,但是有锁获取和释放的可操作性,可中断的获取锁以及超时获取锁以及共享锁等多种synchronized关键字所不具备的同步特性。
2.synchronized同步块执行完成或者遇到异常时锁会自动释放,而lock必须调用unlock()方法手动释放锁。
范例:使用lock的形式如下
Lock lock = new ReentrantLock();
lock.lock();
try{
//代码只有一个线程可以运行
.......
}finally{
lock.unlock();
}
Lock接口API
下面是Lock接口中常用的一些方法:
1. void lock(); // 获取锁
2. void lockInterruptibly() throws InterruptedException; // 获取锁的过程能够响应中断
3. boolean tryLock(); // 非阻塞式响应中断能立即返回,获取锁返回true反之返回false
4. boolean tryLock(long time,TimeUnit unit);// 超时获取锁,在超时内或未中断的情况下能获取锁
5. Condition newCondition(); // 获取与lock绑定的等待通知组件,当前线程必须先获得了锁才能等待,等待会释放锁,再次获取到锁才能从等待中返回。
ReentrantLock中基 本 上 所 有 的 方 法 的 实 现 实 际 上 都 是 调 用 了 其 静 态 内 存 类 Sync 中 的 方 法 , 而 Sync 类 继 承 了AbstractQueuedSynchronizer ( AQS ,同步器) 。
AQS同步器
什么是同步器
同步器是用来构建锁和其他同步组件的基础框架,它的实现主要依赖一个int成员变量来表示同步状态以及通过一个FIFO队列构成等待队列。同步器的子类(被推荐定义为自定义同步组件的静态内部类)必须重写AQS的几个protected修饰的用来改变同步状态的方法,其他方法主要是实现了排队和阻塞机制。状态的更新使用getState,setState以及compareAndSetState这三个方法。
锁->面向使用者,它定义了使用者与锁交互的接口,隐藏了实现细节.
同步器->面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待和唤醒等底层操作。
AQS的模板方法设计模式
AQS的设计是使用模板方法设计模式,它将一些方法开放给子类进行重写,而同步器给同步组件所提供模板方法又会重新调用被子类所重写的方法。
范例:自己实现一个简易的Lock锁
package hhh.pre.java;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//自定义实现锁
class Mutex1 implements Lock{
private Sync sync=new Sync();
//自定义同步器
static class Sync extends AbstractQueuedSynchronizer{
//获取同步状态
//自定义->0表示无锁,1表示拿到锁
@Override
protected boolean tryAcquire(int arg) {
if(arg!=1){
throw new RuntimeException("arg参数不为1");//没有锁状态
}
if(compareAndSetState(0,1)){
//此时线程成功获取同步状态,即获取到锁
setExclusiveOwnerThread(Thread.currentThread());//获取当前持有线程ID
return true;
}
return false;
}
//尝试释放锁
//0表示无锁,1表示拿到锁
@Override
protected boolean tryRelease(int arg) {
if(getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);//把当前持有线程置空
setState(0);//线程状态标记为无锁状态
return true;
}
//获取锁是否成功
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
}
//上锁
@Override
public void lock() {
sync.acquire(1);//将锁状态标记为1,表示上锁
}
//获取锁的过程能够响应中断
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,time);
}
@Override
public void unlock() {
sync.release(1);
}
@NotNull
@Override
public Condition newCondition() {
return null;
}
}
public class Test {
public static void main(String[] args){
Lock lock=new Mutex1();
for(int i=0;i<5;i++){
Thread thread=new Thread(()->{
try {
lock.lock();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
});
thread.start();
}
}
}
运行结果:线程获取锁(其他线程等待),执行5秒钟之后,释放锁,然后再次竞争锁。我们可以进入debug模式观察它的动态执行结果。
深入理解AQS
AQS的模板方法
独占锁:
1. void acquire(int arg) : 独占式获取同步状态,如果获取失败则插入同步队列进行等待
2. void acquireInterruptibly(int arg) : 与acquire方法相同,但在同步队列中等待时可以响应中断
3. boolean tryAcquireNanos(int arg,long nanosTimeout) : 在2的基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false
4. 4. boolean tryAcquire(int arg) : 获取锁成功返回true,否则返回false
5. boolean release(int arg) : 释放同步状态,该方法会唤醒在同步队列中的下一个节点。
共享式锁:
1. void acquireShared(int arg) : 共享式获取同步状态,与独占锁的区别在于同一时刻有多个线程获取同步状态
2. void acquireSharedInterruptibly(int arg) : 增加了响应中断的功能
3. boolean tryAcquireSharedNanos(int arg,lone nanosTimeout) : 在2的基础上增加了超时等待功能
4. boolean releaseShared(int arg) : 共享锁释放同步状态
同步队列
在AQS有一个静态内部类Node,这是我们同步队列的每个具体节点。
在这个类中有如下属性:
1. volatile int waitStatus; // 节点状态
2. volatile Node prev; // 当前节点的前驱节点
3. volatile Node next; // 当前节点的后继节点
4. volatile Thread thread; // 当前节点所包装的线程对象
5. Node nextWaiter; // 等待队列中的下一个节点
节点的状态如下:
1. int INITIAL = 0; // 初始状态
2. int CANCELLED = 1; // 当前节点从同步队列中取消
3. int SIGNAL = -1; // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程继续运行。
4. int CONDITION = -2; // 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
5. int PROPAGATE = -3; // 表示下一次共享式同步状态获取将会无条件地被传播下去。
同步队列的底层数据结构是带有头结点的双向链表。
独占锁
独占锁的获取
调用lock方法是获取独占锁。
来看ReentrantLock源码:
final void lock() {
//lock方法使用CAS来尝试将同步状态改为1(即获取锁),如果成功则将同步状态持有线程置为当前线程。
acquire()方法
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//获取失败,将调用AQS提供的acquire()
}
acquire():尝试获取同步状态
public final void acquire(int arg) {
// 再次尝试获取同步状态,如果成功则方法直接返回
// 如果失败则调用addWaiter()方法
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter(Node.EXCLUSIVE), arg):将当前线程以指定模式(独占式、共享式)封装成Node节点后置入同步队列。
addWaiter()方法的实现源码:
private Node addWaiter(Node mode) {
//将线程以指定模式封装为Node节点
Node node = new Node(Thread.currentThread(), mode);
// 获得当前队列的尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
//使用CAS将当前结点插入到同步队列中
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//初始化队列,将节点插入同步队列,直到插入成功为止
enq(node);
return node;
}
结点入队列后排队获取同步状态:acquireQueued(Node node,int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
//获取当前结点的前驱节点
final Node p = node.predecessor();
//获取同步状态成功的条件
//前驱节点为头结点并且获取同步状态成功
if (p == head && tryAcquire(arg)) {
//将当前节点设置为头结点
setHead(node);
//删除原来头结点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node) :是否要阻塞线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前屈结点的结点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
//前驱结点已经从队列中取消
if (ws > 0) {
//不断重试直到找到一个前驱节点状态不为取消状态
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱结点状态不是取消状态时,将前驱结点状态置为-1
//表示后继结点应该处于等待状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
流程图
独占锁的释放
我们用unlock()方法来释放锁。看源码:
public void unlock() {
sync.release(1);
}
unlock()实际调用AQS提供的release()模板方法,看源码:
public final boolean release(int arg) {
//释放状态成功后
if (tryRelease(arg)) {
//获取当前同步队列的头结点
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继结点
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor(Node node):唤醒后继结点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//将当前同步状态置为初始状态
Node s = node.next;//拿到当前结点的后继结点
//如果后记结点为空或者同步状态为初始状态就从队尾开始找,找一个距离当前结点最近的不为空的等待唤醒的结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒线程
}
因此,每一次释放后就会唤醒队列中该节点的后继结点所包装的线程。
总结
在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor()方法唤醒后继节点。
可中断式获取锁
可响应中断式锁调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly()方法,源码为:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
//线程获取失败
doAcquireInterruptibly(arg);
}
在获取同步状态失败后就会调用doAcquireInterruptibly方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//将节点插入到同步队列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 获取锁出队
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//线程中断异常
throw new InterruptedException();
}
} finally {
//删除当前结点
if (failed)
cancelAcquire(node);
}
}
超时等待式获取锁
通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:
1. 在超时时间内,当前线程成功获取了锁;
2. 当前线程在超时时间内被中断;
3. 超时时间结束,仍未获得锁返回false。
tryLock(long timeout, TimeUnit unit)源码:
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
我们发现tryLock()的实现主要依赖于tryAcquireNanos()的实现,下面我们再来看看tryAcquireNanos(int arg, long nanosTimeout)的源码实现:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//如果线程中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
// 实现超时等待的效果
doAcquireNanos(arg, nanosTimeout);
}
最终是靠doAcquireNanos方法实现超时等待的效果,该方法源码如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 根据超时时间和当前时间计算出截止时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 当前线程获得锁出队列
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 重新计算超时时间
nanosTimeout = deadline - System.nanoTime();
// 已经超时返回false
if (nanosTimeout <= 0L)
return false;
// 线程阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 线程被中断抛出被中断异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}