一:阻塞队列的概念
阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
阻塞插入:put方法,当队列满时,队列会阻塞插入元素的线程,直到队列不满。
阻塞移除:take方法,当队列为空时,获取元素的线程会等待队列变为非空。
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。
|
抛出异常 |
特殊值 |
阻塞 |
超时 |
插入 |
||||
移除 |
||||
检查 |
不可用 |
不可用 |
二:阻塞队列的特点
①不允许插入空值
BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。
②线程安全
BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的
③定容(有容量大小)
BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。
三:阻塞队列的实现类
①ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。次队列按照先进先出(FIFO)的原则对元素进行排序。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
ArrayBlockingQueue(int capacity) |
ArrayBlockingQueue(int capacity, boolean fair) |
访问者的公平性是使用可重入锁实现的。
②LinedBlockingQueue
LinedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
③PriorityBlockinQueue
PriorityBlockinQueue进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类实现compareTo()方法或者比较器(Comparator),比较器可使用修改键断开主优先级值之间的联系。
④DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口。创建元素时可以指定多久才可以从队列中获取当前元素。
DelayQueue可以应用的场景:缓存系统的设计、定时任务的调度。
实现Delayed接口:
long |
第一步:创建对象的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的先后顺序。
第二步:实现getDelay方法
第三步:实现compareTo方法指定元素的顺序。
⑤SynchronousQueue
SynchronousQueue是一个存储元素的阻塞队列每一个put操作必须等待一个take操作。
⑥LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移除元素。
四:阻塞队列的实现原理
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
我们查看源代码,可以看出ArrayBlockingQueue使用了Condition来实现通知模式的。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}