Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
如果只有一个线程在某种条件下等待,那么就可以准确的唤醒这个线程,如果有多个线程在某种条件下等待,那么 条件.signalAll就会同时唤醒多条线程,signal就会把这种条件下等待的第一个线程给唤醒。
具体的实现看看一下源码吧:
await方法
Reentrantlock lock=new Reentrantlock();
//创建了两种条件,consumer和producer
//虽然都是用newCondition();得到的,但是在不同的线程中调用await和signal方法时,会针对条件调度线程。
Condition consumer=lock.newCondition();
Condition producer=lock.newCondition();
//new的结果是产生一个ConditionObject实例
public class ConditionObject implements Condition//实现了condition接口
//condition接口的主要方法就是await()和signal()方法,用来代替Object类的wait和notify方法
看一下await方法的思路:
(1)addConditionWaiter(),把当前线程加入等待队列Condition Queue
(2)fullyRelease(node),释放当前线程
(3)while循环阻塞线程
(4)跳出while循环,判断是该线程是被signal唤醒还是被cancel中断了
(5)如果被signal了但是没有抢到锁,线程再次中断
下面单个的方法剖析:
addConditionWaiter()
//addConditionWaiter()是保存当前线程的状态,并加入到ConditionQueue的尾部
private Node addConditionWaiter() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
//保存当前线程的状态,并加入到CONDITION QUREUE的尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//上文用到node的构造函数是这个
Node(int waitStatus) {
//也就是用当前线程和当前线程的状态构造一个node
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
//上述构造方法就是对下面两个属性赋值的过程,用到了VarHandle
private static final VarHandle WAITSTATUS;
private static final VarHandle THREAD;
//VarHandle类用来支持一些变量的原子性操作,在并行中应用广泛
//https://blog.csdn.net/sench_z/article/details/79793741
}
fullRelease方法
//fullRelease方法经过多层调用,最底层的方法是Reentrantlock重写的 tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//这一步原本是设定哪个线程可以独占锁,置为null,代表释放锁
}
setState(c);
return free;
}
循环阻塞的方法直接在await方法里面看:
关键点一:Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移)
关键点二:判断node是否由于其他线程的signal等操作而转移到sync同步队列。如果这个线程没有在同步队列,就一直while循环阻塞这个线程,等待触发中断或者等待其他线程调用signal方法,从CONDITION移到SYNC里面。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//这里考虑关键点二,如上
LockSupport.park(this);//park就是阻塞的意思。底层是native
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//见下面checkInterruptWhileWaiting源码
break;
}
//跳出while循环的时候,考虑关键点一,如上
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//acquireQueued为true表示在等待的过程中被interrupted。
interruptMode = REINTERRUPT;
//第一个if代表,我是被signal转移过来的,但是我没有抢到锁,我应该继续阻塞,所以退出等待的时候会REINTERRUPT(一定要看下面acquireQueued的解析)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//第二个if代表如果是signal引起的,那么在CONDITION queue里面就node.nextWaiter == null;如果是被cancel的,应该从等待队列中清除
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//第三个if提供唤醒后的处理机制:抛出异常或自己触发中断Thread.currentThread().interrupt();
}
checkInterruptWhileWaiting判断方法如下:
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}//在signal之前被cancel引起的中断返回THROW_IE, signal引起的返回REINTERRUPT,thread没有被中断返回0,继续while循环
重点acquireQueued方法的实现
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {//这里通过死循环的方式,让首结点得到锁
final Node p = node.predecessor();
//同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后续节点,而后续节点将会在获取同步状态成功时将自己设置为首节点。
//可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO。
//只有前驱元是Head并且获得同步状态的线程才能设置Head首节点,所以设置首节点是单线程在操作,不需要保证原子性,就能满足线程安全。
if (p == head && tryAcquire(arg)) {//tryAcquire方法解析在下面
setHead(node); //前驱元是Head并且获得同步状态的线程设定为Head
p.next = null;//并且断开原首节点的next引用,方便GC
return interrupted;//如果return false,说明当前线程拿到锁了。
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
//如果当前线程没有拿到锁,而acquireQueued返回true,是因为parkAndCheckInterrupt()返回true
//所以说如果return true,代表signal过来了,但是没有拿到锁。
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
tryAcqure方法也是调用了Reentrantlock重写的方法,如下
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&//如果没有前任,就用current获取当前锁。
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
signal方法
signal的本质意义,是把Condition Queue里面的线程Node,转移到SYNC Queue,让线程获得锁。
因为上文await方法总是将刚刚释放锁的线程添加到ConditionQueue的尾巴上,而signal总是从ConditionQueue的firstwaiter开始转移,所以await和signal是遵循FIFO(FirstIInFirstOut)的。
以signalAll为例
public final void signalAll(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignalAll(first);//用firstWaiter调用doSignalAll方法
}
}
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null; // 1. 将 lastWaiter, firstWaiter 置空
do{
Node next = first.nextWaiter; // 2. 初始化下个换新的节点
first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
transferForSignal(first); // 4. 调用 transferForSignal的enq方法将 first 转移到 Sync Queue 里面
first = next; // 5. 开始换新 next 节点
}while(first != null);
}
最关键的还是enq方法,如下:
//Inserts node into queue
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
//如果oldTail != null,就把oldTail作为PREV
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {//Atomically方法
//原子性的方法保证,当前线程认为的为节点和用getVolatile获取的结点一致,保证没有其他线程干扰的情况下,将node设置为当前尾结点的下一个才是有效的。
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS(CompareAndSet)变得“串行化”了。
结论:一个await沉睡一条线程,一个线程节点加到ConditionQueue里面,一个signaAll唤醒所有等待池中的线程,需要按照顺序将节点进行添加,总是让先睡的先醒,先醒的叫醒后等的。