文章目录
java.util.concurrent包中的Lock包中主要时AQS框架和LockSupport
Locks 锁
此包中实现的最基本的锁,阻塞线程的LockSupport。核心是AQS框架(AbstractQueuedSynchronizer),是J U C(java util concurrent) 最复杂的一个类。
Lock 和Synchronized
J U C 中的Lock和synchronized具有同样的语义和功能。不同的是,synchronized 锁在退出块时自动释放。而Lock 需要手动释放,且Lock更加灵活。Syschronizd 是 java 语言层面的,是系统关键字;Lock则是java 1.5以来提供的一个类。
Synchronized 具有以下缺陷,它无法中断一个正在等候获得锁的线程;也无法通过投票得到锁,如果不想等下去,也就没法得到锁;同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行。
而Lock(如ReentrantLock )除了与Synchronized 具有相同的语义外,还支持锁投票、定时锁等候和可中断锁等候(就是说在等待锁的过程中,可以被中断)的一些特性。
除了ReentrantLock是外部类,其他的3个都是内部类,Segment是ConcurrentHashMap的内部类,ReadLock和WriteLock都是ReentrantReadWriteLock的内部类
Lock接口中主要定义了加锁和解锁操作。
Lock. lockInterruptibly ,调用后,或者获得锁,或者被中断后抛出异常。优先响应异常。
LockSupport 和java内置锁
在LockSupport出现之前,如果要block/unblock某个Thread,除了使用Java语言内置的monitor机制之外,只能通过Thread.suspend()和Thread.resume()。然而Thread.suspend()和Thread.resume()基本上不可用,除了可能导致死锁之外,它们还存在一个无法解决的竞争条件:如果在调用Thread.suspend()之前调用了Thread.resume(),那么该Thread.resume()调用没有任何效果。LockSupport最主要的作用,便是通过一个许可(permit)状态,解决了这个问题。LockSupport 只能阻塞当前线程,但是可以唤醒任意线程。
那么LockSupport和Java语言内置的monitor机制有什么区别呢?它们的语义是不同的。LockSupport是针对特定Thread来进行block/unblock操作的;wait()/notify()/notifyAll()是用来操作特定对象的等待集合的。正如每个Object都有一个锁,每个Object也有一个等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法来操作。Java语言内置的monitor机制:同时拥有锁和等待集合的实体,通常被成为监视器(monitor)。每个Object的等待集合是由JVM维护的。等待集合一直存放着那些因为调用对象的wait方法而被阻塞的线程。由于等待集合和锁之间的交互机制,只有获得目标对象的同步锁时,才可以调用它的wait、notify和notifyAll方法。这种要求通常无法靠编译来检查,如果条件不能满足,那么在运行的时候调用以上方法就会导致其抛出IllegalMonitorStateException。
· 如果当前线程已经被中断,那么该方法立刻退出,然后抛出一个InterruptedException异常。否则线程会被阻塞。
· JVM把该线程放入目标对象内部且无法访问的等待集合中。
· 目标对象的同步锁被释放,但是这个线程锁拥有的其他锁依然会被这个线程保留着。当线程重新恢复质执行时,它会重新获得目标对象的同步锁。
notify()方法被调用后,会执行如下操作:
· 如果存在的话,JVM会从目标对象内部的等待集合中任意移除一个线程T。如果等待集合中的线程数大于1,那么哪个线程被选中完全是随机的。
· T必须重新获得目标对象的同步锁,这必然导致它将会被阻塞到调用Thead.notify()的线程释放该同步锁。如果其他线程在T获得此锁之前就获得它,那么T就要一直被阻塞下去。
· T从执行wait()的那点恢复执行。
notifyAll()方法被调用后的操作和notify()类似,不同的只是等待集合中所有的线程(同时)都要执行那些操作。然而等待集合中的线程必须要在竞争到目标对象的同步锁之后,才能继续执行。
在标准的Sun jdk 中,Locksupport的实现基于Unsafe,都是本地代码,采用park和unpark进行加锁和解锁的操作。JDK并发中的AQS框架使用的就是LockSupport中的park/unpark操作。
一个线程调用park阻塞之后,如果被其他线程调用interrupt(),那么他它会响应中断,解除阻塞,但是不会抛出interruption 异常。这点在构造可中断获取锁的时候用到了。
AbstractQueuedSynchronizer
AQS框架是 J U C包的核心。是构建同步、锁、信号量和自定义锁的基础。也是构建高级工具的基础。
锁,信号量的实现内部都有两个内部类,都继承AQS。
**由于AQS的构建上采用模板模式(Template mode),即 AQS定义一些框架,而它的实现延迟到子类。如tryAcquire()方法。由于这个模式,我们如果直接看AQS源码会比较抽象。所以从某个具体的实现切入简单易懂。这里选泽ReentrantLock ,**它和Synchronized具有同样的语义。
简单说来,AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全 部处于阻塞状态,经过调查线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用 sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把 线程交给系统内核进行阻塞。
ReentrantLock
从ReentrantLock(可重入锁)开始,分析AQS。首先需要知道这个锁和java 内置的同步Synchronized具有同样的语义。如下代码解释重入的意思
Lock lock = new ReentrantLock();
public void test() {
lock.lock();
System.out.print("I am test1");
test(); // 递归调用 ……………………………1 递归调用不会阻塞,因为已经获得了锁,这就是重入的含义
// test2();// 调用test2 ………………………2
lock.unlock();// 这里应该放在finally 块中,这里简单省略,以后一样。
}
public void test2() {
lock.lock();
System.out.println("I am test1");
test2();//
lock.unlock();
}
重入的意思就是,如果已经获得了锁,如果执行期间还需要获得这个锁的话,会直接获得锁,不会被阻塞,获得锁的次数加1;每执行一次unlock,持有锁的次数减1,当为0时释放锁。这点,Synchronized 具有同样语义。
lock
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
从源码中可以看出,ReentrantLock 对Lock接口的实现,把所有的操作都委派给一个叫Sync的类。Sync类继承了AQS接口,通过ReentrantLock可以看出Sync有存在两个子类
final static class NonfairSync extends Sync
final static class FairSync extends Sync
从名称可以看出是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁,因此先从默认的非公平锁的lock()方法开始看。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//AQS中的模板方法,tryAcquire()操作延迟到子类中实现
public final void acquire(int arg) {
//如果获得了锁,就不用执行后面的操作
//如果没有获得锁,则尝试将其加入到等待队列,如果加入队列成功,则中断当前线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg);
selfInterrupt();
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//采用CAS设置同步状态为,如果当前线程已经获得锁则
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//调用AQS的模板方法,然后调用到自己实现的tryAcquire方法
acquire(1);
}
//实现AQS的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
//非公平尝试获得锁的方式
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//判断线程是否获得锁,如果没有获得锁,则修改同步状态位,尝去获得锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果获得了锁,并且是已经获得了锁的线程,则将状态位增加,即重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
compareAndSetState(intexpect, int des) 为AQS的方法,设置同步状态,NonfairSync 通过修改同步状态获得锁,锁定不成功才执行acquire(1),此方法也在AQS中定义。而 FairSync.lock 直接执行acquire(1)。AbstractQueuedSynchronizer中抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中实现。 tryAcquire方法的语义在于用具体子类判断请求线程是否可以获得锁,无论成功与否AbstractQueuedSynchronizer都将处理后面的流程。
static final class FairSync extends Sync {
//公平锁尝试获得锁的方式
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前没有获得锁,则尝试获得锁
if (c == 0) {
//判断等待队列中是否存在其他线程,如果不存在则尝试获得锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程本身就持有锁,那么叠加状态值,持续获得锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
从源代码中可以看出,公平锁在未获得锁的情况下,会判断等待队列中是否存在其他线程,按照FIFO的顺序获得锁,但是非公平锁在为获得锁的情况下,会直接尝试获得锁,如果未获得锁才会采用公平锁的方式。
unlock
public void unlock() {
sync.release(1);
}
默认采用非公平锁
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//AQS中的模板方法,tryRelease操作延迟到子类中实现
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
//实现AQS的tryRelease方法
protected final boolean tryRelease(int releases) {
//将状态值减少,当状态值为0时,该锁才释放
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
Condition
Condition 实现了与java中monitor 类似的功能。提供 await,signal,signalAll 等操作,与Object中wait,notify,notifyAll等一系列操作对应。不同的是一个condition 可以有多个条件队列。这点内置monitor 是做不到的。另外还支持超时、取消等更加灵活的方式。
和内置的Monitor一样,调用 Condition.await 等操作,需要获得锁,也就是 Condition 是和一个锁绑定在一起的。它的实现是在AQS中的ConditionObject中,基本思想如下:
await
public final void await() throws InterruptedException {
// 1.如果当前线程被中断,则抛出中断异常
if (Thread.interrupted())
throw newInterruptedException();
// 2.将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。
Node node = addConditionWaiter();
// 3.调用tryRelease,释放当前线程的锁
long savedState =fullyRelease(node);
int interruptMode = 0;
// 4.为什么会有在AQS的等待队列的判断?
// 解答:signal作会将Node从Condition队列中拿出并且放入到等待队列中去,在不在AQS等待队列就看signal是否执行了
// 如果不在AQS等待队列中,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)
break;
}
// 5.这个时候线程已经被signal()或者signalAll()作给唤醒了,退出了4中的while循环
// 自旋等待尝试再次获取锁,调用acquireQueued方法
if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
整个await的过程如下:
1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。进行2。
2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。
3.自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。进行4。
4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。
可以看到,这个await的*作过程和Object.wait()方法是一样,只不过await()采用了Condition队列的方式实现了Object.wait()的功能。
signal、signalAll
await*()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await*()中FIFO队列中第一个Node唤醒(或者全部Node)唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就像上面提到的第4步(acquireQueued)。
public final void signal() {
//判断当前线程是否持有锁,如果未持有锁则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//尝试唤醒队列中的一个线程
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//线程被取消
static final int CANCELLED = 1;
//表明线程需要被signal
static final int SIGNAL = -1;
//
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
总结
从上面的源码看出,跟Object的wait,notify,notifyAll方法一样,await,signal,signalAll方法使用前也需要先获得锁,否则会抛出异常,因此在设计上,Condition的实现类ConditionObject实际上是AQS的一个内部类,同时newCondition方法是定义在ReentrantLock锁的内部的。
从上面分析的ReentrantLock中得到AQS的锁中会维护一个同步队列,用来保证想要获得资源的线程,在ReentrantLock中它是一个双向队列。
private transient volatile Node head;
private transient volatile Node tail;
在Condition中也维护了一个等待队列,在Condition中它是一个单向队列。
private transient Node firstWaiter;
private transient Node lastWaiter;
下面是他们的关系图
总结:
-
线程1调用reentrantLock.lock时,线程被加入到AQS的同步队列中。
-
线程1的condition的await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。
-
接着马上被加入到Condition的等待队列中,阻塞住等待 signal 信号。
-
线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的同步队列中。
-
线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。 注意,这个时候,线程1并没有被唤醒。
-
signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。
-
直到释放所整个过程执行完毕。
Semaphore和Exchanger
Semaphore
主要作用:信号量的实现,限制线程的并发数量,如果不限制线程并发的数量,则CPU的资源很快被耗尽,每个线程执行的任务是相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低。它具有synchronized所不具有的强大功能,比如等待获得许可的同时可以加入等待时间,还有尝试是是否可以持有锁等这类扩展功能。
Semaphore类发放许可的计算方式是“减法”操作。
与ReentrantLock一样,它内部也是由一个继承AQS的Sync类,同时也具有公平锁和非公平锁的实现,与Lock锁不一样的是,Lock锁是排它锁(EXCLUSIVE),Semaphore信号量是共享锁(SHARED)
Exchanger
主要作用:在两个线程之间传递任意数据类型的数据
类Exchanger的**exchange()**方法具有阻塞的特色,也就是调用这个方法后等待其他线程来取数据,如果没有其他线程来取数据,则一直阻塞等待。
CountDownLatch和CyclicBarrier
这两个工具将同步与线程“组团”的方式一起执行任务。
CountDownLatch
类CountDownLatch是一个同步功能的辅助类,使用效果是给定一个计数,当使用这个CountDown Latch类的线程判断计数不为0时,则呈wait状态,如果为0时则继续运行。
CountdownLatch对计数的操作是减法操作。
CyclicBarrier
类CyclicBarrier不仅有CountDownLatch所具有的功能,还可以实现屏障等待的功能,也就是阶段性同步,它在使用上的意义在于可以循环地实现线程要一起做任务的目标,而不是像CountDownLatch一样,仅仅支持一次线程与同步点阻塞的操作。通过ReentrantLock和Condition实现。
CyclicBarrier对计数的操作是加法操作。
区别:
CountDownLatch作用:一个线程或者多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行。
CyclicBarrier作用:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待。
Phaser
CyclicBarrier解决了CountDownLatch的一些问题,Phaser用来解决CyclicBarrier的问题,是Java 7新增的功能。主要是增加了对parties和屏障点更灵活的操作。
类Phaser提供了动态增减parties计数,这点比CyclicBarrier类操作parties更加方便,通过若干个方法来控制多个线程之间同步运行的效果,还可以实现针对某一个线程取消同步运行的效果,而且支持在制定屏障处等待,在等待时还支持中单或非终端等功能。
Phaser对计数的操作是加法操作。