基本使用
-
放入DelayQueue中的元素,需要实现Delayed接口
import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayTask implements Delayed { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private long scheduleTimestamp; public DelayTask(long delay, TimeUnit timeUnit) { long delayMillis = TimeUnit.MILLISECONDS.convert(delay, timeUnit); this.scheduleTimestamp = System.currentTimeMillis() + delayMillis; } /* 线程从DelayQueue取元素时,会调用此方法,当此方法返回值<=0时,元素被取出,否则,线程阻塞 **/ @Override public long getDelay(TimeUnit unit) { long expire = scheduleTimestamp - System.currentTimeMillis(); return unit.convert(expire,TimeUnit.MILLISECONDS); } /* 往DelayQueue中添加元素会调用此方法,决定新元素位置 **/ @Override public int compareTo(Delayed o) { return (int)(scheduleTimestamp - ((DelayTask)o).scheduleTimestamp); } public void exec() { long nowTimestamp = System.currentTimeMillis(); Instant nowInstant = Instant.ofEpochMilli(nowTimestamp); Instant scheduleInstant = Instant.ofEpochMilli(scheduleTimestamp); LocalDateTime nowTime = LocalDateTime.ofInstant(nowInstant, ZoneId.systemDefault()); LocalDateTime scheduleTime = LocalDateTime.ofInstant(scheduleInstant,ZoneId.systemDefault()); long mills = nowTimestamp - scheduleTimestamp; System.out.println("executing task..scheduledTime=" + dtf.format(scheduleTime) + ", execTime=" + dtf.format(nowTime) + ",delay=" + mills + "ms"); } }
-
定义一个生产者,负责往DelayQueue里扔元素
import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class DelayTaskProducer implements Runnable{ private DelayQueue<DelayTask> queue; private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public DelayTaskProducer(DelayQueue<DelayTask> queue) { this.queue = queue; } @Override public void run() { while (true) { long sleep = (long)(Math.random() * 20); long delay = (long)(Math.random() * 10); try { TimeUnit.SECONDS.sleep(sleep); queue.put(new DelayTask(delay,TimeUnit.SECONDS)); LocalDateTime time = LocalDateTime.now(); String timeFormat = dtf.format(time); System.out.println(timeFormat + " - 生产者[" + hashCode() + "],添加了一个" + delay + "秒后执行的任务..."); } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
定义一个消费者,负责消费DelayQueue中的元素
import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class DelayTaskConsumer implements Runnable { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private DelayQueue<DelayTask> queue; public DelayTaskConsumer(DelayQueue<DelayTask> queue) { this.queue = queue; } @Override public void run() { while (true) { try { DelayTask task = queue.take(); LocalDateTime now = LocalDateTime.now(); String timeFormat = dtf.format(now); System.out.println(timeFormat + " - 消费者[" + hashCode() + "],拿到任务,准备执行..."); task.exec(); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } }
-
跑起来
import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; public class DelayedQueueTest { private static final DelayQueue<DelayTask> queue = new DelayQueue<>(); private static final int producerNum = 5; private static final int consumerNum = 2; public static void main(String[] args) throws IOException { ThreadGroup producerGroup = new ThreadGroup("producerGroup"); ThreadGroup consumerGroup = new ThreadGroup("consumerGroup"); List<Thread> producerThreads = createProducer(producerNum,producerGroup); List<Thread> consumerThreads = createConsumer(consumerNum,consumerGroup); startThreads(producerThreads); startThreads(consumerThreads); int read = System.in.read(); //阻塞 } private static void startThreads(List<Thread> list) { for (Thread t : list) { t.start(); } } private static List<Thread> createProducer(int num,ThreadGroup group) { List<Thread> list = new ArrayList<>(num); for (int i = 0; i < num; i++) { list.add(new Thread(group,new DelayTaskProducer(queue))); } return list; } private static List<Thread> createConsumer(int num,ThreadGroup group) { List<Thread> list = new ArrayList<>(num); for (int i = 0; i < num; i++) { list.add(new Thread(group,new DelayTaskConsumer(queue))); } return list; } }
结果:
基本原理
变量
ReentranLock lock;
PriorityQueue< E extend Delayed> q;
Thread leader;
Condition available = lock.newCondition();
操作
-
put
-
lock.lock();
先上锁 -
q.offer(e);
将e直接塞到内部的优先队列中 -
偷偷看一眼队列中第一个元素是不是刚才塞进去的那个元素,如果是, 将leader设为null,并唤醒在available上等待的线程
if (q.peek() == e) { leader = null; available.signal(); }
-
lock.unlock();
释放锁
-
-
take
lock.lockInterruptibly();
先上锁for(;;)
开启死循环- 偷偷看一眼队列中的第一个元素
E first = q.peek();
first
如果是null,则当前线程在available条件上等待- 否则,
long delay = first.getDelay(NANOSECONDS);
看first
这个任务是否到了执行时间- 如果
delay <= 0
,则说明到了执行时间,直接return q.poll();
相当于取出first
- 否则,说明任务还未到执行时间,然后让当前线程在available条件上等待delay这么长的时间
available.awaitNanos(delay);
- 如果