带有过期特性的队列DelayQueue

DelayQueue是一个无界阻塞的队列,队列中的每个元素都有一个过期时间,当要从队列中取出数据时,只有过期元素才会出队。

DelayQueue内部使用PriorityQueue存放元素,又用ReentrantLock实现线程同步。因为DelayQueue内部要获取元素的剩余时间,所以我们的数据类需要继承Delayed接口,Delayed又继承Comparable接口,实现排序,而自身只有一个getDelay()方法,用来获取元素的剩余时间,如果getDelay()返回<=0的值,则表示这个元素过期,通过take()方法即可取出他,如果没有过期值,则take()会一直阻塞。

DelayQueue类涉及到TimeUnit,是在java.util.concurrent包下的一个枚举类,可以非常方便实现时间单位的转化。

    //秒转分
    System.out.println(TimeUnit.SECONDS.toMinutes(2400));
    //分转小时
    System.out.println(TimeUnit.MINUTES.toHours(120));
    //分转小时
    System.out.println(TimeUnit.HOURS.convert(120,TimeUnit.MINUTES));
40
2
2

还可以线程sleep

 TimeUnit.MINUTES.sleep(1);

以上就是TimeUnit的一些用法。
在看DelayQueue的简单示例。
首先定义数据类,MyData被实例化后,过期时间是当前时间+转换成毫秒的seconds。在getDelay中返回剩余时间(剩余时间=到期时间-当前时间)。toString()只是方便查看从实例话后到被取出打印花了多长时间。

public class MyData implements Delayed {
    private  long expire;
    private int id;
    private long start;

    public MyData(int  seconds) {
        this.expire = System.currentTimeMillis()+TimeUnit.SECONDS.toMillis(seconds);
        this.start=System.currentTimeMillis();
        this.id = seconds;
    }


    @Override
    public long getDelay(TimeUnit unit) {
        long convert = unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return  convert;
    }

    @Override
    public int compareTo(Delayed o) {
        return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-this.start) +"秒";
    }
}

测试类,这里也就是入队5个元素,每个元素从被new出后开始算,当前时间+i秒后相继会过期。

 public static void main(String[] args) throws IOException, InterruptedException {
     DelayQueue<MyData> queue =new DelayQueue<>();
     for (int i = 1; i <=5 ; i++) {
         queue.offer(new MyData(i));
     }
     for (;;){
         System.out.println(queue.take());
     }
 }

输出结果,也正是想要的。

12345

offer()

这个方法用来将元素添加到队列,如果元素为null,则抛异常,q是PriorityQueue队列,PriorityQueue会更具自然顺序或Comparator接口进行排序。也就是说下面这段,q.peek方法并不一定是当前添加的元素,如果是当前插入元素,则重置leader线程为null,然后激活avaliable变量条件队列的一个线程,告诉他队列里面有元素了。

 public boolean offer(E e) {
 	//获取到独占锁
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
     	//向PriorityQueue队列添加数据
         q.offer(e);
         if (q.peek() == e) {
             leader = null;
             available.signal();
         }
         return true;
     } finally {
         lock.unlock();
     }
 }

take()

获取并移除队列中过期元素,没有则等待。

首先也是获取独占锁,然后使用peek获取一个元素(peek不移除元素)。
如果first为null,则把当前线程放入available中阻塞等待(available是Condition)。
如果first不为null,则调用这个元素的getDelay获取还有多长时间要过期(参数是NANOSECONDS,纳秒),如果<=0,则表示已经过期,直接出队返回,否则查看leader是否为null,不为null则说明其他线程也在执行take(),则把线程放入条件队列。如果leader为null,则让leader为当前线程,然后执行等待,剩余过期时间到达后,然后重置leader线程为null,重新进入循环,重新进入后就可以发现队列中的头部元素已经过期,然后返回他。在最后的finally块中,如果判断结果为true,则说明当前线程从队列移除过期元素后,又有其他线程执行了入队,就会调用条件变量的signal方法,激活条件队列里面的等待线程。

 public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     lock.lockInterruptibly();
     try {
         for (;;) {
         	//获取元素
             E first = q.peek();
             if (first == null)
             	//等待
                 available.await();
             else {
             	//获取first元素要过期的时间。
                 long delay = first.getDelay(NANOSECONDS);
                 //如果小于等于0,则直接返回
                 if (delay <= 0)
                     return q.poll();
                 first = null; 
                 if (leader != null)
                     available.await();
                 else {
                 	//设置leader为当前线程。
                     Thread thisThread = Thread.currentThread();
                     leader = thisThread;
                     try {
                     	//等待
                         available.awaitNanos(delay);
                     } finally {
                         if (leader == thisThread)
                             leader = null;
                     }
                 }
             }
         }
     } finally {
         if (leader == null && q.peek() != null)
             available.signal();
         lock.unlock();
     }
 }

poll()

获取并移除队头过去元素,如果没有过期元素则返回null。
较简单,只有一个if判断队列是否为空,不为空的话如果队头元素没有过期则返回null。

 public E poll() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         E first = q.peek();
         if (first == null || first.getDelay(NANOSECONDS) > 0)
             return null;
         else
             return q.poll();
     } finally {
         lock.unlock();
     }
 }

size()

返回队列元素个数

发布了42 篇原创文章 · 获赞 7 · 访问量 7704

猜你喜欢

转载自blog.csdn.net/HouXinLin_CSDN/article/details/104630294