版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Pengjx2014/article/details/82914474
Netty源码分析—MpscLinkedQueue
MpscLinkedQueue是netty自己实现的线程安全的队列,与JDK通过锁实现的LinkedBlockingQueue不同,MpscLinkedQueue是一种针对Netty中NIO任务设计的一种队列,允许有多个生产者,只有一个消费者的队列。MpscLinkedQueue源码注释如下:
1. 一个无锁的支持单消费者多生产者的并发队列;
2. 允许多个生产者同时进行以下操作:offer(Object),add(Object),addAll(Collection);
3. 只允许一个消费者进行以下操作:poll(),remove(),remove(Object),clear();
4. 以下方法不支持:remove(Object o),removeAll(Collection),retainAll(Collection);
生产者生产数据
多个生产者生产数据元素的方法如下:
public boolean add(E e) {
if (offer(e)) {
return true;
}
throw new IllegalStateException("queue full");
}
public boolean offer(E value) {
if (value == null) {
throw new NullPointerException("value");
}
// 如果传入的是node则直接使用,否则实例化一个newTail
final MpscLinkedQueueNode<E> newTail;
if (value instanceof MpscLinkedQueueNode) {
newTail = (MpscLinkedQueueNode<E>) value;
newTail.setNext(null);
} else {
newTail = new DefaultNode<E>(value);
}
MpscLinkedQueueNode<E> oldTail = replaceTail(newTail);
oldTail.setNext(newTail);
return true;
}
private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
return getAndSet(node);
}
//采用原子更新的方式来添加节点
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
消费者消费数据
单个消费者消费数据的方法如下
public E poll() {
//获取链表中的第一个元素
final MpscLinkedQueueNode<E> next = peekNode();
if (next == null) {
return null;
}
// 下一个节点变成新的头结点
MpscLinkedQueueNode<E> oldHead = headRef.get();
// 直接将此次获取到的数据修改成头结点
headRef.lazySet(next);
// 将原头结点的next置为null,去除oldHead与新头结点之间的关联
oldHead.setNext(null);
// 获取节点中的数据,并将value置为null,去除节点与数据直接的关联
return next.clearMaybe();
}
//获取链表中的第一个元素
private MpscLinkedQueueNode<E> peekNode() {
for (;;) {
final MpscLinkedQueueNode<E> head = headRef.get();
final MpscLinkedQueueNode<E> next = head.next();
// 当头结点与尾节点不同时,说明肯定已经有数据插入了;
if (next != null) {
return next;
}
// 头结点与尾节点相同,说明还在初始化状态,直接返回null
if (head == getTail()) {
return null;
}
}
}
总结
MpscLinkedQueue通过使用链表存储数据以及巧妙的CAS操作,实现单消费者多生产者队列,代码简洁高效,适合netty中的无锁化串行设计。