先看一下其实现的接口:
public interface Condition {
/**
* 暂停此线程直至一下四种情况发生
* 1.此Condition被signal()
* 2.此Condition被signalAll()
* 3.Thread.interrupt()
* 4.伪wakeup
* 以上情况.在能恢复方法执行时,当前线程必须要能获得锁
*/
void await() throws InterruptedException;
//跟上面类似,不过不响应中断
void awaitUninterruptibly();
//带超时时间的await()
long awaitNanos(long nanosTimeout) throws InterruptedException;
//带超时时间的await()
boolean await(long time, TimeUnit unit) throws InterruptedException;
//带deadline的await()
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒某个等待在此condition的线程
void signal();
//唤醒所有等待在此condition的所有线程
void signalAll();
}
整体讲下这个ConditionObject的实现,其实其维护两个队列,
1、Condition队列,表示等待的队列,其waitStatus=Node.Condition,由firstWaiter和lastWaiter两个属性操控.
2、Sync队列,表示可以竞争锁的队列,这个跟AQS一致,waitStatus=0;
3、await()方法呢就是把当前线程创建一个Node加入Condition队列,接着就一致循环查其在不在Sync队列,如果当前节点在Sync队列里了,就可以竞争锁,恢复运行了.
4、signal()方法就是把某个节点的nextWaiter设为null,再把其从Condition队列转到Sync队列.
看一下头部构造方法:
/**
* ConditionObject是AbstractQueuedSynchronizer中的一部分,它是用来实现通知/等待机制的,和Object的wait/notify一样,
* Codition分为两个部分,分别为await和signal,前者应用为等待,后者用于唤醒。
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* 默认的实力化创建
*/
public ConditionObject() { }
/**
* 添加节点到等待队列里边
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
/**
* 根据当前线程创建一个节点
*/
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
await()方法
public final void await() throws InterruptedException {
if (Thread.interrupted())//工作线程不能是已中断的
throw new InterruptedException();
Node node = addConditionWaiter();//新建工作线程对应的条件节点,并插入到条件队列中
int savedState = fullyRelease(node);//释放工作线程占有的锁,但是记下工作线程锁重入的次数
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//无限循环,直到工作线程对应的节点不再被条件队列包含为止
LockSupport.park(this);//自我等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//线程从Condition中退出了,并且被转移到AQS的等待队里,排队并尝试获取锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 添加一个等待Node到队列中
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点被取消了,清理掉
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建一个Condition状态的节点,并将其加在尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* 释放当前的state,最终还是调用tryRelease方法
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/**
* 是否还需要等待
*/
final boolean isOnSyncQueue(Node node) {
//如果状态为Node.CONDITION,即还在Condition队列中,还得再循环中等待
//如果其waitStatus不为Node.CONDITION,其前置节点,即Sync的前置节点为null,为啥也继续等着呢???谁来解答我的疑问
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果状态不为Node.CONDITION,即不在Condition队列了,有前置节点也有后置节点,那么其一定在Sync队列
if (node.next != null)
return true;
/*
* 其前置节点为非null,但是也不在Sync也是可能的,因为CAS将其加入队列失败.所以我们需要从尾部开始遍历确保其在队列
*/
return findNodeFromTail(node);
}
/
/**
* 从尾部查找node节点
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* 暂停此线程,直至许可满足
* 只看没有中断和超时的await方法,其它处理中断和超时的逻辑不care
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
//释放当前线程的锁
int savedState = fullyRelease(node);
boolean interrupted = false;
//看自己在不在等待队列,在
while (!isOnSyncQueue(node)) {
//为了线程调度,禁用当前的线程,直到许可可用
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
//获取刚才释放的锁,参考AQS中acquire的讲解吧
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
signal()方法
/**
* 释放信号
*/
public final void signal() {
//如果不是排它模式,则抛出IllegalMonitorStateException异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//将等待队列的第一个节点出队列,并将其加入AQS的锁队列
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* 真正的释放信号
*/
private void doSignal(Node first) {
do {
//讲first的nextWaiter设为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
}
// 如果转换队列不成功且等待队列不为null,继续do
while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 将一个节点从condition队列转换到Sync队列
* 这段代码直接加入到AQS队列中,如果该节点的状态为cancel或者CAS操作失败则直接唤醒该线程
* 将当前节点添加到Sync队列尾部,并设置前置节点的waitStatus为SIGNAL,表明后继有节点(可能)将被唤醒,如果取消或者设置waitStatus失败,会唤醒重新同步操作,这时候waitStatus是瞬时的,出现错误也是无妨的
* LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport 提供park()和unpark()方法实现阻塞线程和解除线程阻塞,
* LockSupport和每个使用它的线程都与一个许可(permit)关联。permit相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0
* ,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。
* 每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。park()和unpark()不会有 “Thread.suspend和Thread.resume所可能引发的死锁” 问题,
* 由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。
* 如果调用线程被中断,则park方法会返回。同时park也拥有可以设置超时时间的版本。
*/
final boolean transferForSignal(Node node) {
//所谓的Condition队列和Sync队列就在于waitStatus值
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/**
* 唤醒所有等待在此condition的所有线程
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* 遍历所有节点,使其加入到Sync队列
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}