在jdk中,自带了一个优先级队列,scheduledThreadPoolExecutor自己内部也实现了一个优先级队列
我们先不考虑jdk是如何实现的,其实让我们自己去实现一个优先级队列的实现的话,完全可以这样来做:
在入队的时候,根据当前key对应的hash值,或者是根据自己的特定的比较规则,从第一个元素开始对比,如果当前要入队的元素优先级小于队列中的第一个key,就插入到前面,反之,继续往后去对比,这样就可以实现一个简单的优先级队列,在每次取出来第一个元素的时候,就是优先级最高的
所以:对于优先级队列来说,最为重要的就是:优先级的比较规则
PriorityQueue
那来看下PriorityQueue在入队的时候,是怎么做的
// 这是存储元素的数据结构
transient Object[] queue; // non-private to simplify nested class access
// 这是规则比较器,其实就是来比较哪个优先级高,哪个优先级低
private final Comparator<? super E> comparator;
入队方法
我们来看下offer入队方法
public boolean offer(E e) {
// 1.如果入队的元素为null,抛出异常
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
// 2.如果当前元素个数大于等于数组长度,就扩容
if (i >= queue.length)
grow(i + 1);
size = i + 1;
// 3.如果数组为空,是第一个入队的元素,就直接放到第一个位置即可,无需比较
if (i == 0)
queue[0] = e;
else
// 4.否则,就需要进行优先级的比较
siftUp(i, e);
return true;
}
和其他队列唯一的区别是:这里多了一个优先级比较的步骤
接着来看下是如何进行优先级的排序的
优先级比较
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
/**
* 这里是对要插入的元素进行判断
* @param k:元素X所占用的位置
* @param x:要插入的元素
*/
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
/**
* 获取到k位置的父元素,用要插入的元素和其父元素相比
* 返回值为-1,表示要插入的元素早于parent执行,就需要继续向前遍历
* 返回值为1,表示要插入的元素晚于parent执行,就不遍历了,直接放在该位置即可
*
* 右移一位表示原值的0.5
* 如果k-1是10,那么parent就是5
*/
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
/**
* 代码执行到这里,表示要插入的元素x比其父元素 e先执行,那怎么处理呢?将e放到x原来要存放的位置,可以理解为父子元素互换位置
* 互换位置之后,继续向上查找父元素,去比较
*/
queue[k] = e;
k = parent;
}
queue[k] = key;
}
这里优先级比较的逻辑,大致就是这样子的:
1.拿当前元素x和其父元素进行优先级比较,如果x的优先级小于父元素,那就无需在比较了,直接放入到该位置即可
2.如果优先级大于父元素,那就拿x和父元素互换位置
3.互换位置之后,再查找x的父元素,进行比较,执行第一步所做的时候
需要注意的是:优先级队列,内部使用的是二叉堆结构,所以,并不是我们设想的就是一个普通的队列,按照0、1、2、3… 这样去排序
二叉堆的话,可以分为小顶堆和大顶堆,小顶堆的话是值最小的是root节点。大顶堆是值最大的是root节点
其实整个看下来,优先级队列的核心就是这个优先级比较的逻辑
scheduledThreadPoolExecutor优先级队列
那scheduledThreadPoolExecutor内部自己所维护的优先级队列,和PriorityQueue基本上是一致的,只是scheduledThreadPool自己定义了一个优先级规则比较器
在前面的博客中,有说到过,在调用scheduledThreadPoolExecutor的方法的时候,会把我们定义的runnable方法转换成ScheduledFutureTask对象,那其实在入队的时候,入队的是这个对象的实现类
接着来看下入队的方法
/**
* 入队方法,将待执行的任务插入到队列中
* 在入队的时候,会进行优先级的判断
* @param x
* @return
*/
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
/**
* 1.加锁
*/
lock.lock();
try {
int i = size;
/**
* 2.判断是否需要进行扩容
* 这里的扩容,和ArrayList扩容的方法类型:
* 先扩容50%,然后通过Arrays.copy将数组扩容之后的数据,再复制到queue中
*/
if (i >= queue.length)
grow();
size = i + 1;
/**
* 3.如果当前插入的是第一个任务
* 就将e设置为头结点
*
* 否则的话,就进行优先级的处理
*/
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
/**
* 4.这里是如果插入了第一个元素,去通知
* take方法,这里的available是一个condition对象
*/
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
这里的逻辑,和jdk自带的优先级队列的逻辑几乎上是一致的,来看下优先级比较的代码
/**
* Sifts element added at bottom up to its heap-ordered spot.
* Call only when holding lock.
* 这是DelayedWorkQueue自己实现的,在入队时,进行优先级判断的逻辑
* k:当前待插入元素要入队的位置
* key:就是要入队的任务
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
/**
* 1.获取到k对应的父节点元素
*/
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
/**
* 2.如果任务k执行的时间晚于e父节点的,就无需再遍历处理
* 如果k的执行时间早于e,那就需要交换位置,然后再次遍历判断父节点和交换之后的优先级
*/
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
/**
* 设置待插入元素的实际位置
*/
queue[k] = key;
setIndex(key, k);
}
这里的compareTo方法就比较有意思了
因为要插入的元素k是RunnableScheduledFuture类型的,所以会调用RunnableScheduledFuture的compareTo方法
/**
* 用来比较优先级,这里的other是插入元素要对比的元素
* @param other
* @return
*/
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
/**
* 如果当前要插入的元素对应的时间 早于X节点执行,那就返回-1
* 如果要插入的元素对应的执行时间 晚于X节点执行,那就返回1
* 举例:X要在5S之后执行,但是当前插入的元素在2S之后执行,那这里的diff就小于0,返回-1
* 如果X要5S之后执行,但是待插入元素是10S之后执行,那这里的diff就大于0,返回1
* 至于下面的sequenceNumber应该是在任务是同时执行的情况下,再进行的优先级判断吧
* sequenceNumber是根据AtomicLong生成的,所以理论上不会重复,即使并发请求
*/
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
/**
* 如果要比较的任务不是ScheduledFutureTask,那就直接获取到每个任务还有多少毫秒要执行,进行优先级判断
*/
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
前面博客中ScheduledThreadPoolExecutor部分源码学习
有说过,这个线程池中,优先级就是根据time时间来比较的,这个代码也可以印证这个观点
结论
所以,整体看下来,所谓的优先级队列,内部采用了二叉堆结构,和普通的队列的唯一区别就是,多了一步优先级的比较
而优先级的比较规则,可以自己定义,也可以采用jdk自带的
我们如果要扩展,只需要提供优先级比较规则即可,可以根据到期时间、值的大小等的