Juc04_阻塞队列概述、方法、实现类、Linked和Array区别、注意事项

①. 什么是阻塞队列

  • ①.阻塞队列:从名字可以看出,它也是队列的一种,那么它肯定是一个先进先出FIFO的数据结构。与普通队列不同的是,他支持两个附加操作,即阻塞添加和阻塞删除方法

  • ②. 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。而在这一系列操作必须符合以下规定:

  1. 阻塞添加:当阻塞队列是满时,往队列里添加元素的操作将被阻塞
  2. 阻塞移除:当阻塞队列是空时,从队列中获取元素/删除元素的操作将被阻塞

在这里插入图片描述

  • ③. 现有三个角色:顾客,休息区,银行办理窗口。Thread1为顾客,BlockingQueue为休息区,Thread2为银行办理窗口
  1. 正常情况下,一个银行办理窗口同一时间只能对接一个顾客
  2. 恰巧今天办理的顾客有3个人,另外2个顾客怎么办,你总不至于给人家说不办了,快回家吧
  3. 而正确的做法是你可以让这两个人在休息区等候,等银行窗口空闲了,然后去办理
  • ④. 阻塞队列的好处:阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作

②. BlockingQueue的主要方法

  • ①. BlockingQueue提供的部分方法:
public interface BlockingQueue<E> extends Queue<E> {
    
    
   
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;
        
    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)throws InterruptedException;
        
    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}
  • ②. 根据插入和取出两种类型的操作,具体分为下面一些类型:
  1. 抛出异常是指当队列满时,再次插入会抛出异常如果队列未满,插入返回值未true
  2. 返回布尔是指当队列满时,再次插入会返回false
  3. 阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入
  4. 超时是指当一个时限过后,才会插入或者取出

在这里插入图片描述

  • ③. 生产 - add、offer、put这3个方法都是往队列尾部添加元素,区别如下:
  1. add:不会阻塞,添加成功时返回true,不响应中断,当队列已满导致添加失败时抛出IllegalStateException
  2. offer:不会阻塞,添加成功时返回true,因队列已满导致添加失败时返回false,不响应中断
  3. put:会阻塞会响应中断
  • ④. 消费 - take、poll方法能获取队列头部第1个元素,区别如下:
  1. 会响应中断,会一直阻塞直到取得元素或当前线程中断
  2. 会响应中断,会阻塞,阻塞时间参照方法里参数timeout.timeUnit,当阻塞时间到了还没取得元素会返回null
  • ⑤. add方法代码以及结果
public class BlockingQueueTest {
    
    
    public static void main(String[] args) {
    
    
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        System.out.println("--------以下为add的相关操作---------");
        addRemoveTest(blockingQueue);
    }

    public static void addRemoveTest(BlockingQueue<String> blockingQueue) {
    
    
        System.out.println("添加状态+\t"+blockingQueue.add("1"));
        System.out.println("添加状态+\t"+blockingQueue.add("2"));
        System.out.println("添加状态+\t"+blockingQueue.add("3"));
//        System.out.println("添加状态+\t"+blockingQueue.add("4"));
        System.out.println("队首元素+\t"+blockingQueue.element());
        System.out.println("删除元素+\t"+blockingQueue.remove());
        System.out.println("队首元素+\t"+blockingQueue.element());
        System.out.println("删除元素+\t"+blockingQueue.remove());
        System.out.println("队首元素+\t"+blockingQueue.element());
        System.out.println("删除元素+\t"+blockingQueue.remove());
//        System.out.println("队首元素+\t"+blockingQueue.element());
//        System.out.println("删除元素+\t"+blockingQueue.remove(blockingQueue.element()));
    }
}
// 未打开注释代码输出如下:
--------以下为add的相关操作---------
添加状态+	true
添加状态+	true
添加状态+	true
队首元素+	1
删除元素+	1
队首元素+	2
删除元素+	2
队首元素+	3
删除元素+	3
// 当队列已满,继续添加元素时打开注释代码,输出如下:
--------以下为add的相关操作---------
添加状态+	true
添加状态+	true
添加状态+	true
Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
  • ⑥. offer方法代码
public class BlockingQueueTest {
    
    
    public static void main(String[] args) {
    
    
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        System.out.println("--------以下为offer的相关操作---------");
       offerPollTest(blockingQueue);
    }

    private static void offerPollTest(BlockingQueue<String> blockingQueue) {
    
    
        System.out.println("添加状态+\t"+blockingQueue.offer("1"));
        System.out.println("添加状态+\t"+blockingQueue.offer("2"));
        System.out.println("添加状态+\t"+blockingQueue.offer("3"));
        System.out.println("添加状态+\t"+blockingQueue.offer("4"));
        System.out.println("队首元素+\t"+blockingQueue.peek());
        System.out.println("删除元素+\t"+blockingQueue.poll());
        System.out.println("队首元素+\t"+blockingQueue.peek());
        System.out.println("删除元素+\t"+blockingQueue.poll());
        System.out.println("队首元素+\t"+blockingQueue.peek());
        System.out.println("删除元素+\t"+blockingQueue.poll());
        System.out.println("删除元素+\t"+blockingQueue.poll());
    }
}
// 输出如下:
--------以下为offer的相关操作---------
添加状态+	true
添加状态+	true
添加状态+	true
添加状态+	false
队首元素+	1
删除元素+	1
队首元素+	2
删除元素+	2
队首元素+	3
删除元素+	3
删除元素+	null
// 注意:当队列没有元素的时候使用poll,返回null
  • ⑦. put方法代码
public class BlockingQueueTest {
    
    
    public static void main(String[] args) {
    
    
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        System.out.println("--------以下为put的相关操作---------");
        putTakeTest(blockingQueue);
    }

     private static void putTakeTest(BlockingQueue<String> blockingQueue) {
    
    
        try{
    
    
            blockingQueue.put("1");
            blockingQueue.put("2");
            blockingQueue.put("3");
//            blockingQueue.put("4");
            System.out.println("删除元素+\t"+blockingQueue.take());
            blockingQueue.put("4");
            System.out.println("删除元素+\t"+blockingQueue.take());
            System.out.println("删除元素+\t"+blockingQueue.take());
            System.out.println("删除元素+\t"+blockingQueue.take());
        }catch(Exception e){
    
    
            e.getStackTrace();
        }
    }
}
// 打开注释代码输出如下:(程序未停止)
--------以下为put的相关操作---------
// 未打开注释代码输出如下:
--------以下为put的相关操作---------
删除元素+	1
删除元素+	2
删除元素+	3
删除元素+	4

③. BlockingQueue的实现类

  • ①. 从整体架构图上来看,BlockingQueue是实现了Queue接口,而Queue是属于Collection接口下的派生类

在这里插入图片描述

  • ②. ArrayBlockingQueue: 由数组结构组成的有界阻塞队列

  • ③. LinkedBlockingQueue: 由链表结构组成的有界(但大小默认值Integer>MAX_VAL UE)阻塞队列
    需要注意的是LinkedBlockingQueue虽然是有界的,但有个巨坑,其默认大小是IntegerMAX_VALUE,高达21亿,一般情况下内存早爆了在线程池的ThreadPoolExecutor有体现

  • ④. SynchronousQueue:不存储元素的阻塞队列,也即是单个元素的队列

  1. SynchronousQueue没有容量,与其他BlcokingQueue不同,SynchronousQueue是一个不存储元素的BlcokingQueue
  2. 每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
public class SynchronusQueueTest {
    
    
    public static void main(String[] args) {
    
    
        BlockingQueue<String> synchronusQueue = new SynchronousQueue<>();

        new Thread(() ->{
    
    
            try{
    
    
                System.out.println(Thread.currentThread().getName()+"\t put 1");
                synchronusQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"\t put 2");
                synchronusQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"\t put 3");
                synchronusQueue.put("3");
            }catch(Exception e){
    
    
                e.getStackTrace();
            }
        },"Prod").start();

        new Thread(() ->{
    
    
            try {
    
    
                try{
    
     TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){
    
     e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t take "+synchronusQueue.take());
                try{
    
     TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){
    
     e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t take "+synchronusQueue.take());
                try{
    
     TimeUnit.SECONDS.sleep(3); }catch (InterruptedException e){
    
     e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t take"+synchronusQueue.take());
            } catch (Exception e) {
    
    
                e.printStackTrace();
            }
        },"Cons").start();
    }
// 输出如下:有时间间隔
Prod	 put 1
Cons	 take 1
Prod	 put 2
Cons	 take 2
Prod	 put 3
Cons	 take3

  • ⑤. PriorityBlockingQueue:支持优先级排序的无界阻塞队列

  • ⑥. LinkedTransferQueue:由链表结构组成的无界阻塞队列

  • ⑦. LinkedBlockingDeque:由了解结构组成的双向阻塞队列

④. Linked和Array区别

  • ①. 队列大小不同:
  1. arrayBlockingQueue在初始化的时候,必须指定队列的大小
  2. 而LinkedBlockingQueue在初始化的时候,如果你没有指定大小,则会默认Integer.MAX_VALUE,是一个很大的值,这样就会导致数据在一个不可控范围,一旦添加速度远大于移除的速度时,可能会有内存泄漏的风险
  • ②. 底层实现不同
  1. arrayBlockingQueue的底层是一个数组
  2. LinkedBlockingQueue底层是一个链表结构
  3. 官方文档介绍中,LinkedBlockingQueue的吞吐行是高于arrayBlockingQueue;但是在添加或移除元素中,LinkedBlockingQueue则会生成一个额外的Node对象,对GC可能存在影响
  • ④. 为什么说LinkedBlockingQueue的吞吐性是高于arrayBlockingQueue?
    吞吐性能强是因为有两个锁,试想一下,Array里面使用的是一个锁,不管put还是take行为,都可能被这个锁卡住,而Linked里面put和take是两个锁,put只会被put行为卡住,而不会被take卡住,因此吞吐性能自然强于Array。 而“less predictable performance”这个也是显而易见的,Array采用的时固定内存,而Linked采用的时动态内存,无论是分配内存还是释放内存甚至GC动态内存的性能自然都会比固定内存要差

  • ⑤. 锁机制不一样
    arrayBlockingQueue使用的一个锁来控制,LinkedBlockingQueue使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock

⑤. 不推荐使用快捷的线程池

  • ①. 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数

  • ②. 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题

  • ③. newFixedThreadPool方法的源码不难发现,线程池的工作队列直接new了一个LinkedBlockingQueue,而默认构造方法的 LinkedBlockingQueue是一个Integer.MAX_VALUE 长度的队列,可以认为是无界的

public static ExecutorService newFixedThreadPool(int nThreads) {
    
    
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    
    ...


    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
    
    
        this(Integer.MAX_VALUE);
    }
...
}
  • ④. 翻看newCachedThreadPool的源码可以看到,这种线程池的最大线程数是Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的
public static ExecutorService newCachedThreadPool() {
    
    
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
  • ⑤. 线程池的OOM问题,可能是队列满造成的(LinkedBlockingQueue),也可能是线程太多造成的(SynchronousQueue)

猜你喜欢

转载自blog.csdn.net/TZ845195485/article/details/131392997