【Java】 之 Condition

如果想编写一个带有多个条件谓词的并发对象,就可以使用显式的LockCondition 而不是 内置锁(Sychronizer) 和 条件队列(Object)

每个 Java 对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Objectwait() notify() notifyAll()就构成了内部条件队列的 API



一、Condition 简介


Condition 是一个多线程间协调通信的工具类,


(1)方法图

如图:
在这里插入图片描述


二、Condition 使用

public class ConditionTest {

    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public static void main(String[] args) {

        ConditionTest conditionTest = new ConditionTest();

        Producer producer = conditionTest.new Producer();
        Consumer consumer = conditionTest.new Consumer();

        consumer.start();
        producer.start();
    }

    class Consumer extends Thread {
        @Override
        public void run() {

            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " 我在等一个新信号");
                condition.await();
            } catch (InterruptedException e) {

                e.printStackTrace();
            } finally {

                System.out.println(Thread.currentThread().getName() +"  拿到一个新信号");

                lock.unlock();
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {

            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " 发出一个信号");
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    }
}

运行结果:
在这里插入图片描述

可以看到:

  1. consumer获取锁后,再调用condition.wait()producer能再获取锁
  2. producer调用condition.signal()后,consumer能继续往下执行

过程:

  1. consumer线程 调用 lock.lock()后,consumer线程被加入到 AQS 队列中

  2. consumer线程 调用 condition.await(),该线程从 AQS 队列中移除,再是锁释放

    扫描二维码关注公众号,回复: 8769286 查看本文章
  3. consumer线程 被加入到 Condition队列中,等待被唤醒signal信号

  4. producer线程 因为 consumer线程释放锁后,被唤醒,获得锁后,并加入 AQS 队列

  5. producer线程从 AQS 队列出队,执行

  6. producer线程调用condition.signal()

  7. consumer线程从condition队列中出队,并加入 AQS 队列

  8. condition.signal()调用完毕后,producer线程释放锁

  9. AQS 队列 出队 consumer线程,并执行



三、AQSCondition实现


Tips:这里有两个队列

  1. AQS队列:等待处理的线程
  2. Condition队列:等待唤醒的线程

在这里插入图片描述


(1)wait() 等待

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 创建一个新的节点,并添加在 Condition 队列
    Node node = addConditionWaiter();
    // 释放这个节点所持有的锁,并唤醒 AQS 队列中一个线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 判断这个节点是否在 AQS 队列上,第一次判断总是返回 false
    while (!isOnSyncQueue(node)) {
        // 挂起这个线程
        LockSupport.park(this);
        // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上.
        //  isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了(JDK 注释说,由于 CAS 操作队列上的节点可能会失败),就继续阻塞.
        //  isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 被唤醒后,重新开始竞争锁,如果竞争不到继续挂起,等待唤醒
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点. 
    // 如果是 null ,就没有什么好清理的了.
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 如果线程被中断了,需要抛出异常.或者什么都不做
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

过程:

  1. 加入 Condition等待队列
  2. 释放锁
  3. 挂起这个线程

(2)signal() 通知

public final void signal() {

    // 如果当前线程不是持有该锁的线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 拿到 Condition 队列上第一个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 如果修改这个 node 状态为0失败了(也就是唤醒失败), 并且 firstWaiter 不是 null, 就重新循环.
        // 通过从 First 向后找节点,直到唤醒或者没有节点为止.
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    
    // 如果不能改变状态,就返回
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 将这个 node 放进 AQS 队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
       // 唤醒节点
       LockSupport.unpark(node.thread);
    return true;
}

过程:

  1. 获取 Condition队列头节点
  2. 释放该头节点
  3. 并加此头节点放入 AQS 队列中
  4. 等待出队处理
发布了404 篇原创文章 · 获赞 270 · 访问量 42万+

猜你喜欢

转载自blog.csdn.net/fanfan4569/article/details/100862417