简述:
《Java编程思想》 第四版 P726 ~ P727
DelayQueue, 是一个无界的BlockingQueue, 用于放置实现了Delay接口的对象,其中的对象只能在器到期时才能从队列中取走。这种队列是有序的,即对头对象的延迟到期时间最长。如果没有任何延迟到期,那么就不会有任何头元素
代码:
DelayedTask.java
package com.anialy.test.concurrency.delayqueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private int delta;
private long trigger;
protected static List<DelayedTask> sequence =
new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds){
delta = delayInMilliseconds;
// 设置触发的延时时间
trigger = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
sequence.add(this);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask)arg;
if(trigger < that.trigger)
return -1;
if(trigger > that.trigger)
return 1;
return 0;
}
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public void run() {
System.out.printf(this + " \n");
}
@Override
public String toString() {
return String.format("[%1$-4d]", delta)
+ " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
}
DelayedTaskConsumer.java
package com.anialy.test.concurrency.delayqueue;
import java.util.concurrent.DelayQueue;
public class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q){
this.q = q;
}
public void run() {
try {
while(!Thread.interrupted()){
q.take().run();
}
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
}
最后一个关闭executorService的线程
EndSentinel.java
package com.anialy.test.concurrency.delayqueue;
import java.util.concurrent.ExecutorService;
public class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e){
super(delay);
exec = e;
}
@Override
public void run() {
for(DelayedTask pt : sequence){
System.out.printf(pt.summary() + " \n");
}
System.out.printf("\n" + this + " Calling shutdownNow()\n");
exec.shutdownNow();
}
}
DelayQueueDemo.java
package com.anialy.test.concurrency.delayqueue;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
for(int i=0; i<20; i++){
queue.put(new DelayedTask(rand.nextInt(5000)));
}
queue.add(new EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
输出: