之前做reactive streaming的调研,在回压的场景中,都会用到Queue。
周末的时候又有人问到Queue的实现原理,但是印象比较模糊,现在来具体的分析一下,Project Reactor对传统Queue的优化。
当然RxJava也是用到的同样的Queue的实现。
类说明与注释
final class SpscLinkedArrayQueue<T> extends AbstractQueue<T>
implements BiPredicate<T, T>
一个无限制的,由阵列支持的单生成器,单用户队列,具有固定的链接大小。
此实现基于JCTools的SPSC算法:SpscUnboundedArrayQueue和SpscUnboundedAtomicArrayQueue,其中SpscUnboundedAtomicArrayQueue由此库的作者之一提供。 值得注意的是,这个类没有填充,也没有涉及前瞻性缓存; 填充对短期使用或突发性使用造成影响,并且前瞻与小队列无关。
数据结构
在分析SpscLinkedArrayQueue队列之前,我们先来了解一下SpscLinkedArrayQueue的第一个神奇之处,它神奇的数据结构。
SpscLinkedArrayQueue的数据结构主要神奇在它既不是传统的数组,又不是传统的链表,而是数组+链表。我说的好像过于玄乎了,还是具体来看看吧。
在SpscLinkedArrayQueue内部维持着类似于上面的数据结构,链表的每个节点是一个数组,而每个节点数组,最后两位不是用来存储数据,而是倒数第二位用来存储一个标记对象,倒数第一位用来存储下一个节点引用。
在整个数据结构中,SpscLinkedArrayQueue是不会遍历链表的,而是用一个producerBuffer或者consumerBuffer对象用来指向当前的节点。所以这里存在一个问题,一旦前一个节点被填充满了,producerBuffer就指向了下一个节点,同时一旦前一个节点被消费完毕,consumerBuffer也指向下一个节点,此时前一个节点不会被SpscLinkedArrayQueue复用,而是安安静静的等待自己被GC回收。
实际上,上面图中的HASH_NEXT不是在固定的位置,也就是说,它不一定在倒数第二位,这种情况待会我们在下面分析时,会详细的解释。但是next指针绝对在该数组的最后一位,这个是毋庸置疑的。
成员变量
了解了SpscLinkedArrayQueue的数据结构,我们开始正式来分析SpscLinkedArrayQueue,当然,我们还是从它的成员变量开始,来看看它成员变量有哪些,分别表示什么含义。
变量名 | 类型 | 类型 |
---|---|---|
producerIndex | AtomicLong | 这个用来表示当前生产者生成数据的index,实际上这个变量不是指生成数据的index,而是要跟相应的mask计算才是,此变量只增不减。 |
producerLookAheadStep | int | 这个变量用来表示生产者可以往前看的数量,默认为容量的1/4,最大为4096。 |
producerLookAhead | long | 这个变量用来表示index最大的值,也就是说在扩容之前,index能达到的最大值。 |
producerMask | int | 这个变量用跟index计算offset,这个offset才是真正的位置。默认值二进制全为1,也就是2^n - 1。 |
producerBuffer | AtomicReferenceArray | 表示生产者生成的数据放入的节点。这个变量是链表的一个节点。 |
consumerMask | int | 消费者的mask,用来计算当前消费需要消费的数据的位置。默认跟producerMask一样。 |
consumerBuffer | AtomicReferenceArray | 表示消费者当前需要消费的那个数组节点。意义跟producerBuffer差不多 |
consumerIndex | AtomicLong | 表示当前消费者需要消费的数据的index,意义跟producerIndex差不多。 |
HAS_NEXT | Object | 用来表示当前数组节点有下一个节点。 |
构造方法
我们先来看看SpscLinkedArrayQueue的构造方法,看看它为我们做了哪些初始化。
SpscLinkedArrayQueue(int linkSize) {
int c = Queues.ceilingNextPowerOfTwo(Math.max(8, linkSize));
this.producerArray = this.consumerArray = new AtomicReferenceArray<>(c + 1);
this.mask = c - 1;
}
根据传入的大小创建bufferArray的大小。这里RxJava的大小都会扩展为2^n。
offer方法
@Override
public boolean offer(T e) {
Objects.requireNonNull(e);
long pi = producerIndex;
AtomicReferenceArray<Object> a = producerArray;
int m = mask;
int offset = (int) (pi + 1) & m;
if (a.get(offset) != null) {
//如果不为空,说明buffer满了,要扩容
offset = (int) pi & m;
//新建AtomicReferenceArray
AtomicReferenceArray<Object> b = new AtomicReferenceArray<>(m + 2);
producerArray = b;
//通过unsafe工具类去设置value,unsafa提供安全的native方法,如:unsafe.putOrderedObject(array, checkedByteOffset(i), newValue);
b.lazySet(offset, e);
//通过unsafe工具类去设置新创建的AtomicReferenceArray,unsafa提供安全的native方法,如:unsafe.putOrderedObject(array, checkedByteOffset(i), newValue);
a.lazySet(m + 1, b);
a.lazySet(offset, NEXT);
PRODUCER_INDEX.lazySet(this, pi + 1);
}
else {
offset = (int) pi & m;
//通过unsafe工具类去设置value,unsafa提供安全的native方法,如:unsafe.putOrderedObject(array, checkedByteOffset(i), newValue);
a.lazySet(offset, e);
PRODUCER_INDEX.lazySet(this, pi + 1);
}
return true;
}
poll方法
看完了offer方法,现在我们再来看看poll方法。
public T poll() {
long ci = consumerIndex;
AtomicReferenceArray<Object> a = consumerArray;
int m = mask;
int offset = (int) ci & m;
Object o = a.get(offset);
if (o == null) {
return null;
}
if (o == NEXT) {
//如果是NEXT指针,则取链表的下一个节点的AtomicReferenceArray
AtomicReferenceArray<Object> b = (AtomicReferenceArray<Object>) a.get(m + 1);
a.lazySet(m + 1, null);
//从下一个节点的AtomicReferenceArray获取元素
o = b.get(offset);
a = b;
consumerArray = b;
}
a.lazySet(offset, null);
CONSUMER_INDEX.lazySet(this, ci + 1);
return (T) o;
}
总结
总的来说,SpscLinkedArrayQueue涉及过于神奇。这里我来做一个简单的总结。
1、SpscLinkedArrayQueue的数据结构为数组+链表,其中SpscLinkedArrayQueue不会遍历数组,这个是SpscLinkedArrayQueue涉及的神奇之处。
2、利用producerIndex和consumerIndex快速索引。
3、扩容不需要重新rehash或者锁,而是创建新的AtomicReferenceArray作为旧的AtomicReferenceArray的下一个新的节点。