简介
LinkedTransferQueue 是一个高效阻塞无界链表队列。和SynchronousQueue.TransferQueue(公平模式)相比,它是可以统计长度,可以进行查询的;和LinkedBlockingQueue相比,它拥有更高的性能(使用CAS自旋);和ConcurrentLinkedQueue相比,它拥有阻塞功能。
LinkedTransferQueue 类
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable
LinkedTransferQueue 继承AbstractQueue抽象类,实现TransferQueue接口
TransferQueue 接口
public interface TransferQueue<E> extends BlockingQueue<E>
TransferQueue 继承BlockingQueue接口,所以它具有阻塞队列功能
TransferQueue 方法
// 尝试交换指定的元素,否则返回false
boolean tryTransfer(E e);
// 交换指定的元素,没有消费者就阻塞
void transfer(E e) throws InterruptedException;
// 带超时时间的尝试交换
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 是否有消费者在等待
boolean hasWaitingConsumer();
// 获取正在等待的消费线程数量
int getWaitingConsumerCount();
重要内部类Node
static final class Node
Node 属性
// 是否是DATA模式
final boolean isData;
// 元素
volatile Object item;
// 下一个节点
volatile Node next;
// 等待线程
volatile Thread waiter;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// 下面三个为偏移量
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
Node 静态加载初始化
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
Node 构造函数
Node(Object item, boolean isData) {
// 设置item属性
UNSAFE.putObject(this, itemOffset, item);
// 设置模式
this.isData = isData;
}
Node 方法
// 修改下一个节点
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 修改元素
final boolean casItem(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 将next指向自己
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
// 节点被取消,元素指向自己,清空等待线程
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
// 是否匹配过了
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
// 是否是一个未匹配的请求节点
final boolean isUnmatchedRequest() {
//
return !isData && item == null;
}
// 如果给定节点不能连接在当前节点后则返回true
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
// 匹配节点
final boolean tryMatchData() {
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
LinkedTransferQueue 属性
// 是否为多核
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
// 自旋次数
private static final int FRONT_SPINS = 1 << 7;
// 前驱节点正在处理,当前节点需要自旋的次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
// // 容忍清除节点失败次数的阈值
static final int SWEEP_THRESHOLD = 32;
// 头节点
transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 断开被删除节点失败的次数
private transient volatile int sweepVotes;
// xfer方法的入参;立即,不需要等待
private static final int NOW = 0;
// xfer方法的入参;异步,不会阻塞
private static final int ASYNC = 1;
// xfer方法的入参;同步,会阻塞
private static final int SYNC = 2;
// xfer方法的入参;带超时时间
private static final int TIMED = 3;
// 不安全类
private static final sun.misc.Unsafe UNSAFE;
// 偏移量
private static final long headOffset;
private static final long tailOffset;
private static final long sweepVotesOffset;
LinkedTransferQueue 静态加载初始化偏移量
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = LinkedTransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
sweepVotesOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("sweepVotes"));
} catch (Exception e) {
throw new Error(e);
}
}
LinkedTransferQueue 构造函数
// 无参构造函数
public LinkedTransferQueue() {
}
// 线性集合初始化
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
LinkedTransferQueue 核心方法
private E xfer(E e, boolean haveData, int how, long nanos) {
// 数据模式,数据为空抛异常
if (haveData && (e == null))
throw new NullPointerException();
// 新节点
Node s = null;
retry:
// 自旋
for (;;) {
// 从头节点开始遍历
for (Node h = head, p = h; p != null;) {
// 获取当前节点是否是Data模式
boolean isData = p.isData;
// 获取当前节点元素
Object item = p.item;
// 判断p模式,并且不是节点取消item=p
if (item != p && (item != null) == isData) {
// 判断p模式跟当前模式是否一样
if (isData == haveData)
// 模式一样说明要么都是put要么都poll
break;
// 模式不同开始匹配
if (p.casItem(item, e)) {
// 能到这儿,说明修改p的item成功,
// 需要把p设置为已匹配
for (Node q = p; q != h;) {
// 取p下级节点
Node n = q.next;
// 头节点是否变化,没变化就修改头节点
if (head == h && casHead(h, n == null ? q : n)) {
// 原头节点next设置为自己
h.forgetNext();
break;
}
// 能到这,修改失败或头节点改变
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
// 新头节点为空或者next为空,
// 或者当前节点已经匹配过,重新自旋
break;
}
// 匹配成功,取消对方等待
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// 没有匹配到,节点后移继续匹配
Node n = p.next;
p = (p != n) ? n : (h = head);
}
// 能到这儿,说明和队列中存储的节点模式一样
// 或者队列中没有元素了
// 先是否为不需要等待
if (how != NOW) {
// 创建新节点
if (s == null)
s = new Node(e, haveData);
// 尝试入队
Node pred = tryAppend(s, haveData);
// 入队失败,重试
if (pred == null)
continue retry;
// 如果不是异步(同步或者有超时)
// 就等待被匹配
if (how != ASYNC)
// 进入等待
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e;
}
}
入队,队尾拼接元素
private Node tryAppend(Node s, boolean haveData) {
// 判断尾节点
for (Node t = tail, p = t;;) {
Node n, u;
// 队列为空
if (p == null && (p = head) == null) {
// 修改s为头节点(s入队)
if (casHead(null, s))
return s;
}
// 判断s是否可以接在队尾
else if (p.cannotPrecede(haveData))
return null;
// 判断队尾是否已经有节点了
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) :
(p != n) ? n : null;
// 尝试s加入队尾
else if (!p.casNext(null, s))
p = p.next;
// s已经入队
else {
if (p != t) {
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null &&
(s = s.next) != null && s != t);
}
return p;
}
}
}
LinkedTransferQueue 基础方法
// 修改尾节点
private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
// 修改头节点
private boolean casHead(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
// 断开被删除节点失败的次数
private boolean casSweepVotes(int cmp, int val) {
return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
}
// 获取节点模式
private static int spinsFor(Node pred, boolean haveData) {
if (MP && pred != null) {
if (pred.isData != haveData)
return FRONT_SPINS + CHAINED_SPINS;
if (pred.isMatched())
return FRONT_SPINS;
if (pred.waiter == null)
return CHAINED_SPINS;
}
return 0;
}
// 获取下级节点(自旋遍历)
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
// 获取正确的头节点
private Node firstOfMode(boolean isData) {
// 从原来头节点开始匹配
for (Node p = head; p != null; p = succ(p)) {
// 当前不是已匹配就返回
if (!p.isMatched())
return (p.isData == isData) ? p : null;
}
return null;
}
// 获取第一个数据节点
final Node firstDataNode() {
// 从原来头节点开始遍历
for (Node p = head; p != null;) {
Object item = p.item;
// 判断节点模式
if (p.isData) {
// 判断是否为已取消节点
if (item != null && item != p)
return p;
}
// 不是数据节点,并且元素为空,
// 说明队列里都是消费者
else if (item == null)
break;
// 节点后移
if (p == (p = p.next))
p = head;
}
return null;
}
// 获取第一个数据节点元素
private E firstDataItem() {
// 从原来头节点开始遍历
for (Node p = head; p != null; p = succ(p)) {
Object item = p.item;
// 是否为数据节点
if (p.isData) {
// 是否为已取消
if (item != null && item != p)
return LinkedTransferQueue.<E>cast(item);
}
// 数据为空,队列都是消费者
else if (item == null)
return null;
}
return null;
}
// 统计节点数量
private int countOfMode(boolean data) {
int count = 0;
// 从头节点开始遍历
for (Node p = head; p != null; ) {
// 不是已匹配模式
if (!p.isMatched()) {
// 模式不同
if (p.isData != data)
return 0;
// 统计元素数量
if (++count == Integer.MAX_VALUE) // saturated
break;
}
// 节点后移
Node n = p.next;
if (n != p)
p = n;
else {
count = 0;
p = head;
}
}
return count;
}
LinkedTransferQueue 入队
// 阻塞入队
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
// 超时入队
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
// 入队
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
// 入队(无界队列)
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
LinkedTransferQueue 移交元素的方法
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
LinkedTransferQueue 出队
// 阻塞出队
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
// 超时出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
// 出队
public E poll() {
return xfer(null, false, NOW, 0);
}
入队、出队、移交元素都调用xfer
LinkedTransferQueue 查询
// 查询头节点元素
public E peek() {
return firstDataItem();
}
// 统计长度
public int size() {
return countOfMode(true);
}
// 是否为空
public boolean isEmpty() {
for (Node p = head; p != null; p = succ(p)) {
if (!p.isMatched())
return !p.isData;
}
return true;
}