DelayQueue原理与应用
前面刚写了一篇关于 PriorityQueue 原理与应用 的文章,其实是为分析DelayQueue服务的。
DelayQueue是一个延迟队列,用一句话来说就是:DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素。
本文主要围绕以下几点分析DelayQueue:
1) DelayQueue的类继承关系与核心实现接口
2)DelayQueue的核心数据结构
3)DelayQueue的核心方法以及实现原理
4)DelayQueue应用场景以及测试案例
DelayQueue的类继承关系与核心实现接口
DelayQueue首先是一个本质上是一个BlockingQueue阻塞队列。其泛型元素制定了必须实现Delayed接口。
类继承图如下:
有两个主要的接口:
BlockingQueue: 表示DelayQueue是一个阻塞队列, 当队列为空时,take()等操作会阻塞。
Delayed:表示该队列里面的元素是基于延时的,元素的排列顺序是基于超过延时大小从小到大。
这里单独列出Delayed接口函数:
public interface Delayed extends Comparable<Delayed> {
/**使用给定的时间单位返回还需要的剩余延时,如果为零或负值表示延迟已经过去。*/
long getDelay(TimeUnit unit);
}
DelayQueue的核心数据结构
DelayQueue的实现中,核心的成员变量就是:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
从上面的四个核心成员变量可知:
- DelayQueue实际是基于优先级队列PriorityQueue实现的,优先级队列里面的保存元素都实现了Delayed接口,也就是基于delay时间排序的优先级队列,以此来达到先过期的元素会排在队首,每次从队列里取出来都是最先要过期的元素。
- 可重入锁,
- 用于优化阻塞通知的线程元素leader
- 用于实现阻塞和通知的Condition对象
DelayQueue的核心方法以及实现原理
这里主要分析 offer() 和 take() 方法。
offer()方法
源码如下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
主要是以下几个步骤:
- 执行加锁操作
- 把元素添加到优先级队列中
- 查看元素是否为队首
- 如果是队首的话,设置leader为空,唤醒所有等待的队列
- 释放锁
take()方法
检索并删除此队列的头部,如有必要,等待该队列中有过期延迟的元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
主要是以下几个步骤:
1)执行加锁操作,然后进入死循环。
2)取出优先级队列中的队首元素first。
3)如果优先队列队首/队列为空,阻塞请求。
4)如果队首(first)不为空,获得这个元素的delay时间值。
5)如果first的延迟delay时间值小于等于0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素并return 就OK。
6)如果first的延迟delay时间值大于0的话,表示延时时间还没到,就释放元素first的引用,避免内存泄露。
7)判断leader元素是否为空,不为空的话阻塞当前线程。
8)如果leader元素为空的话,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队首到达可以出队的时间,在finally块中释放leader元素的引用。
9)循环执行从1~8的步骤。
10)循环结束后,如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程。
11)执行解锁操作。
上面的源码还是比较好理解的,有一个比较突出的亮点是关于leader变量的使用。
leader元素的使用
关于这个元素的作用,看Java doc的解释翻译过来主要是说:用leader来减少不必要的等待时间,那么这里我们的DelayQueue是怎么利用leader来做到这一点的呢?
这里我们想象着我们有个多个消费者线程用take()方法去取,内部先加锁,然后每个线程都去peek第一个节点。如果leader不为空说明已经有线程在取了,设置当前线程等待:
if (leader != null)
available.await();
如果为空说明没有其他线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环:
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
take方法中为什么释放first元素:
first = null; // don't retain ref while waiting
我们可以看到后面写的注释,那么这段代码有什么用呢?
想想假设现在延迟队列里面有三个对象。
线程A进来获取first,然后进入 else 的else ,设置了leader为当前线程A
线程B进来获取first,进入else的阻塞操作,然后无限期等待
这时在JDK8下面他是持有first引用的
如果线程A阻塞完毕,获取对象成功,出队,这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first.
假设还有线程C 、D、E.. 持有对象1引用,那么无限期的不能回收该对象1引用了,那么就会造成内存泄露。
简单点说,对于可重入锁,可能存在多个线程里面并发持有同一个对象,当该对象不再使用时必须手动释放这个对象,否则可能存在内存泄漏。
DelayQueue应用场景以及测试案例
下面是一个测试用例,延迟队列里面的元素为Elem,实现了Delayed接口。
1)延迟队列底层的优先级队列是按照Elem.liveTime 来排序的,从小到大;(根据Elem里面的比较器实现排序)
2)根据getDelay()函数,如果还没到超时时间,getDelay()就会返回一个正值,表示还需要等待的延迟,这时DelayQueue.take()就会阻塞,直到优先级队列的peek元素延时结束。
代码如下:
public class DelayQueueTest {
private static DelayQueue<Elem> queue = new DelayQueue<Elem>();
public static void main(String[] args) throws InterruptedException {
queue.offer(new Elem<>(12, 12000000000l));//12s
queue.offer(new Elem<>(11, 11000000000l));//11s
queue.offer(new Elem<>(10, 10000000000l));//10s
queue.offer(new Elem<>(1, 70*1000*1000));//70ms
queue.offer(new Elem<>(2, 60000000l));
queue.offer(new Elem<>(3, 50000000l));
queue.offer(new Elem<>(4, 40000000l));
queue.offer(new Elem<>(5, 30000000l));
queue.offer(new Elem<>(6, 20000000l));
queue.offer(new Elem<>(7, 10000000l));//10ms
/**
* 7 这个对象,存活时间最小,所以在优先队列中处于队列头部。
* 只要超过了其存活时间,就可以被poll
*/
TimeUnit.SECONDS.sleep(1);
while (!queue.isEmpty()){
System.out.println(queue.take().getT());
}
}
public static class Elem<T> implements Delayed{
/**
* delay对象
*/
private T t;
/**
* 存活时间,优先级队列里面按照存活时间的大小来排序,存活时间最小的在队列最上面。
*/
private long liveTime ;
/**
* 移除时间点,纳秒级别
*/
private long removeTime;
public Elem(T t,long liveTime){
this.setT(t);
this.liveTime = liveTime;
this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();
}
@Override
public int compareTo(Delayed o) {
if (o == null) return 1;
if (o == this) return 0;
if (o instanceof DelayedItem){
Elem<T> tmpDelayedItem = (Elem<T>)o;
if (liveTime > tmpDelayedItem.liveTime ) {
return 1;
}else if (liveTime == tmpDelayedItem.liveTime) {
return 0;
}else {
return -1;
}
}
long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return diff > 0 ? 1:diff == 0? 0:-1;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.nanoTime(), unit);
}
public T getT() {
return t;
}
public void setT(T t) {
this.t = t;
}
@Override
public int hashCode(){
return t.hashCode();
}
@Override
public boolean equals(Object object){
if (object instanceof DelayedItem) {
return object.hashCode() == hashCode() ?true:false;
}
return false;
}
}
}
上图的测试用例输出为:
7
6
5
4
3
2
1
10
11
12
需要注意的是:延时时间不能设置的太小,比如ms级别以下的,因为这时的