什么是Lock锁?
在Java中锁是出现是为了防止多个线程访问同一资源,锁有内建锁(synchronized),synchronized关键字实现锁是隐式加解锁,在JDK5后,java.util.concurrent包中增加了lock接口,它提供了和synchronized一样可以防止多个线程访问同一资源,但是lock锁需要手动加锁解锁,也提供了synchronized没有的同步特性(后文会叙述)。
既然lock锁是一个接口,那么就需要有子类实现Lock接口。
//Lock接口是实现子类
ReentrantLock() 可重入锁
ReadWriteLock 读写锁
ReentrantReadWriteLock 可重入读写锁
StampedLock 读写锁中读不仅不阻塞读,同时也不应该阻塞写
Lock接口中抽象方法:
public interface Lock {
1. void lock( ); 获取锁
2. void lockInterruptibly() throws InterruptedException; 获取锁的过程能响应中断
3. boolean tryLock(); 获取锁返回true ,否则返回false
4. boolean tryLock(long time,TimeUnit unit); 超时获取锁,在规定时间未获取到锁返回false
5. Condition newCondition(); 获取与lock绑定的等待通知组件,当前线程必须先获得了锁才能等待,等待会释
放锁,再次获取到锁才能从等待中返回。
6.void unlock(); 释放锁
}
在Lock接口中我们可以发现Lock体系拥有可中断的获取锁及超时获取锁等内建锁不具备的特性。
一般会用可重入锁(ReentrantLock)向上转型实例化一个Lock锁:
Lock lock=new ReentrantLock( );
try
{
lock.lock( );
//以下代码只有一个先从可以运行
...
}
finally
{
lock.unlock( );
}
lock必须调用unlock()方法释放锁,因
此在finally块中释放锁。
Reentrantlock(可重入锁)如何实现一个锁?
public ReentrantLock() {
sync = new NonfairSync();
}
public class ReentrantLock implements Lock, java.io.Serializable
{
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer
{}
}
static final class NonfairSync extends Sync{}
Reentrantlock(可重入锁)中所有方法实际上都是调用了其静态内部类Sync中的方法,而Sync继承了AbstractQueuedSynchronizer(AQS --简称同步器)
什么是AQS?
同步器是用来构建锁以及其他同步组件的基础框架,它的实现主要是依赖一个int状态变量以及通过一个FIFO队列共同构成同步对列。
子类必须重写AQS的用proteted 修饰的用来改变同步状态的方法,其他方法主要实现了排队与阻塞机制。int状态的更新使用getState( )/setSate以及compareAndSetState( )/
子类推荐使用静态内部类来继承AQS实现自己的同步语义,同步器既支持独占锁,也支持共享锁。
首先看下AQS的部分源码:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
{
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS的子类需要重新覆写AQS的protected方法(如上protected方法)。
AQS的设计是使用模板方法设计模式,它将一些方法开放给子类进行重写,而同步器给同步组件所提供模板方法又会重新调用被子类所重写的方法。
比如在上面的AQS源码中的acquire方法调用过程:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
AQS的子类NonfairSync将该protected方法进行覆写:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
AQS中acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果调AQS中的 acquire( ),该方法被final修饰,即只能被子类使用,但是在该方法中调用了tryAcquire( )方法,就会调子类NonfairSync的tryAcquire方法。
Lock锁与AQS的关系:
Lock面向使用者,定义了使用者与锁交互的接口;
AQS—面向锁的实现者,简化了锁的实现, 屏蔽了同步状态的管理、线程排队、线程等待与唤醒等底层操作。
了解到AQS和Lock锁的基本知识后我们可以自己写一个类Mutex锁来实现Lock接口,在Mutex中有静态内部类继承AQS类:
package CODE.多线程;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
class Mutex implements Lock
{
private Sync sync=new Sync(); //实例化一个对象为了调静态内部类的方法
static class Sync extends AbstractQueuedSynchronizer
{
/*
获取锁成功返回true,否则返回false
*/
@Override
protected boolean tryAcquire(int arg) //独占式获取同步状态
{
if(arg!=1)
{
throw new RuntimeException("信号量不为1,无法获取锁");
}
if(compareAndSetState(0,1)) //预期值是0,即没有线程获取锁,更新值为1
{
//此时线程成功获取同步状态
setExclusiveOwnerThread(Thread.currentThread()); //将该线程设置为当前锁的持有者
return true;
}
return false;
}
/*
释放锁
*/
@Override
protected boolean tryRelease(int arg) {
if(arg!=1) //只有当该线程持有该锁,才有释放锁
{
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null); //将锁持有者设为空
setState(0); //没有线程获取锁
return true;
}
/*
判断该线程是否持有该锁
*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
}
//接口需要覆写的方法
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lock() {
sync.acquire(1); //获取锁
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return null;
}
}
public class Lock2
{
private static Mutex mutex = new Mutex();
public static void main(String[] args){
for (int i = 0; i < 10;i++){
Thread thread = new Thread(()-> {
mutex.lock();
try {
Thread.sleep(3000);
}catch (Exception e) {
e.printStackTrace();
}finally {
mutex.unlock();
}
});
thread.start();
}
}
}
深入理解AQS
在同步组件的实现中,AQS是核心部分,AQS面向锁的实现,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义,AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等一些底层的实现处理。AQS的核心包括了:同步队列,独占式锁的获取和释放,共享锁的获取和释放,超时等待锁以及可中断锁的获取,这一系列功能的实现。这些实现依靠的是AQS提供的模板方法:
独占式锁:
1.void acquire(int arg):
独占式获取同步状态,如果获取失败插入同步对列进行等待。
2.void acquireInterruptibly(int arg):
在1的基础上,此方法可以在同步队列中响应中断。
3.boolean tryAcquireNanos(int arg,long nanosTimeOut)
在2 的基础上增加了超时等待功能,到了预计时间还未获取到锁直接返回。
4.boolean tryAcquire(int arg) :获取锁成功返回true,否则返回false
5.boolean release(int arg) :释放同步状态,该方法会唤醒在同步队列的下一个结点。
共享式锁:
1.void acquireShared(int arg):
共享式获取同步状态,同一时刻多个线程获取同步状态。
2.void aquireSharedInterruptibly(int arg):在1的基础上增加响应中断。
3.boolean tryAcquireSharedNanos(int arg,long nanosTimeOut)
在2的基础上增加超时等待。
4.boolean releaseShared(int arg):共享锁释放同步状态
AQS的模板方法基于同步队列,那么什么是同步队列呢?
同步队列
当多个线程竞争共享资源时,一个线程竞争到共享资源后,其他请求资源的线程会被阻塞,进入同步队列,也就是说同步队列中存放的被阻塞的线程,这些线程等待cpu调度再次竞争共享资源。
同步队列是一种队列,队列的实现可以通过数组也可通过链表,在AQS中同步队列的数据结构是链表。那是什么链表呢:是有头结点吗?是单向还是双向呢?
可以先看下AQS源码:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
通过源码可以发现同步队列是带头尾结点的双向链表。(注意:不带头结点和带头结点区别:头插时,不带头结点需要频繁改变头指针)并且在添加元素是通过尾插。
在AQS中有一个静态内部类Node
static final class Node {
}
Node类中的属性有:
int waitStatus:结点状态
Node prev:同步队列中前驱结点
Node next:同步队列中后继结点
Thread thread:当前结点包装的线程对象
Node nextWaiter :等待队列中下一个结点
而结点状态分为:
/* 初始状态*/
int INITIAL =0 ;
/** waitStatus value to indicate thread has cancelled */
/* 当前结点从同步队列中取消*/
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
/*后继结点处于等待状态,如果当前结点释放同步状态后会通知后继结点,使后继结点继续运行。*/
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
/*结点处于等待队列中,当其他线程对Condition调用signal()方法后,该结点会从等待队列中移到同步队列*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
/*共享式同步状态会无条件的传播*/
static final int PROPAGATE = -3;