谈谈阻塞队列

队列 + 阻塞队列

阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
在这里插入图片描述
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。

试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
同样
试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程从列中移除一个或者多个元素或者完全清空队列后使队列重新变得空闲起来并后续新增。

为什么用?有什么好处?

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒

为什么需要 BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了

在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

BlockingQueue 的核心方法

在这里插入图片描述

抛出异常 当阻塞队列满时,再往队列里 add 插入元素会抛 IllegalStateException:Queue full
当阻塞队列空时,再往队列里 remove 移除元素会抛 NoSuchElementException
特殊值 插入方法,成功 true 失败 false
移除方法,成功返回出队列的元素,队列里面没有就返回 null
一直阻塞 当阻塞队列满时,生产者线程继续往队列里 put 元素,队列会一直阻塞生产线程直到 put 数据 or 响应中断推出。
当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列可用。
超时推出 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出。

架构梳理 + 种类分析

架构介绍

在这里插入图片描述

种类分析

ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
理论

SynchronousQueue 没有容量

与其他 BlockingQueue 不同, SynchronousQueue 是一个不存储元素的 BlockingQueue。

每一个 put 操作必须要等待一个 take 操作, 否则不能继续添加元素, 反之亦然。

代码演示
package com.brian.interview.study.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * Copyright (c) 2020 ZJU All Rights Reserved
 * <p>
 * Project: JavaSomeDemo
 * Package: com.brian.interview.study.thread
 * Version: 1.0
 * <p>
 * Created by Brian on 2020/2/12 21:39
 */

/**
 * 阻塞队列 SynchronousQueue 演示
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) {

        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put 1");
                blockingQueue.put("1");

                System.out.println(Thread.currentThread().getName() + "\t put 2");
                blockingQueue.put("2");

                System.out.println(Thread.currentThread().getName() + "\t put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AAA").start();

        new Thread(() -> {
            try {
                // 暂时一会儿线程
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();
    }
}

LinkedTransferQueue:由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

用在哪里

生产者消费者模式

传统版
package com.brian.interview.study.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Copyright (c) 2020 ZJU All Rights Reserved
 * <p>
 * Project: JavaSomeDemo
 * Package: com.brian.interview.study.thread
 * Version: 1.0
 * <p>
 * Created by Brian on 2020/2/12 22:08
 */

class ShareData {  // 资源类

    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try {
            // 1、判断
            while (number != 0) {
                // 等待, 不能干活
                condition.await();
            }
            // 2、干活
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            // 3、通知唤醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception {
        lock.lock();
        try {
            // 1、判断
            while (number == 0) {
                // 等待, 不能干活
                condition.await();
            }
            // 2、干活
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);

            // 3、通知唤醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 题目:一个初始值为零的变量, 两个线程对其交替操作, 一个加1一个减1, 来5轮
 * <p>
 * 1、  线程     操作    资源类
 * 2、  判断     干活    通知
 * 3、  防止虚假唤醒机制
 */
public class ProdConsumer_TraditionDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AAA").start();

        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BBB").start();
    }
}
阻塞队列版
package com.brian.interview.study.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Copyright (c) 2020 ZJU All Rights Reserved
 * <p>
 * Project: JavaSomeDemo
 * Package: com.brian.interview.study.thread
 * Version: 1.0
 * <p>
 * Created by Brian on 2020/2/13 12:10
 */

class MyResource {
    private volatile boolean FLAG = true;  // 默认开启, 进行生产+消费
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws Exception {
        String data = null;
        boolean retValue;
        while (FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
            if (retValue) {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "\t 大老板叫停了, 表示 FLAG=false, 生产动作结束");
    }

    public void myConsumer() throws Exception {
        String result = null;
        while (FLAG) {
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (null == result || result.equalsIgnoreCase("")) {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 超过2秒钟没有取到蛋糕, 消费退出");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t 消费队列" + result + "成功");
        }
    }

    public void stop() throws Exception {
        this.FLAG = false;
    }
}

/**
 * volatile/CAS/atomicInteger/BlockQueue/线程交互/原子引用
 */
public class ProdConsumer_BlockQueueDemo {
    public static void main(String[] args) throws Exception {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Product").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Consumer").start();

        // 暂停一会儿线程
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("5秒钟时间到, 大老板main线程叫停, 活动结束");

        myResource.stop();
    }
}

线程池

消息中间件

发布了80 篇原创文章 · 获赞 81 · 访问量 7160

猜你喜欢

转载自blog.csdn.net/qq_35340189/article/details/104573626