本文基于jdk1.8
1.简介
LinkedTransferQueue 底层使用一个双向队列。
双向队列是指队列中存在两种节点,一种是数据节点,一种是非数据节点。
取数据时,如果当前是数据节点则匹配,出队。否则向队列插入一个非数据节点。
存数据时,如果当前是非数据节点则匹配,出队。否则向队列插入一个数据节点。
其继承关系如下
从继承关系可以看出,LinkedTransferQueue是一个堵塞队列。
二.源码解析
1.基本属性
// 队列头
transient volatile Node head;
// 队列为
private transient volatile Node tail;
// 立即,用于poll, tryTransfer这类方法
private static final int NOW = 0;
// 异步,用于offer, put, add这类方法
private static final int ASYNC = 1;
// 同步,用于transfer, take这类方法
private static final int SYNC = 2; // for transfer, take
// 超时,用于超时的 poll, tryTransfer这类方法
private static final int TIMED = 3;
static final class Node {
// 表示当前是否为数据节点
final boolean isData;
// 存放的元素
volatile Object item;
// 下一个节点
volatile Node next;
// 等待线程
volatile Thread waiter;
}
2.构造函数
构造没有设置容量,可以看出其是无界队列
public LinkedTransferQueue() {
}
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
3.xfer方法
LinkedTransferQueue不管新增还是移除最终都是调用xfer方法。
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public E poll() {
return xfer(null, false, NOW, 0);
}
private E xfer(E e, boolean haveData, int how, long nanos) {
// 插入元素不能为null
if (haveData && (e == null))
throw new NullPointerException();
Node s = null;
retry:
for (;;) {
// 队列 初始化过,进行匹配节点
for (Node h = head, p = h; p != null;) {
boolean isData = p.isData;
Object item = p.item;
// 检查是否被其他线程先一步匹配了
if (item != p && (item != null) == isData) {
// 模式一致退出循环,插入队列
if (isData == haveData)
break;
// 模式匹配成功
if (p.casItem(item, e)) {
// 当前节点匹配成功成功,且不是头结点,说明头结点已经先一步被其他线程匹配了
for (Node q = p; q != h;) {
Node n = q.next;
// 设置当前节点的下一个节点为新的头结点
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
}
// 如果头结点为空或下一个节点为空 ,则进行重试
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒p中等待的线程
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
// p匹配失败,此时可能p.next 指向自己则从head遍历否则从p.next遍历
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) {
// 没有匹配元素则创建新节点
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
// 返回null 表示插入的模式和队头模式不一致,则重新尝试进行匹配
if (pred == null)
continue retry;
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e;
}
}
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) {
Node n, u;
// 头尾节点都为null,则初始化头节点
if (p == null && (p = head) == null) {
// CAS更新成功,返回新节点
if (casHead(null, s))
return s; // initialize
}
// 判断当前节点是否能追加,true不能追加,保证队列中的节点都是同一种节点
else if (p.cannotPrecede(haveData))
return null;
// p 节点不是为节点,则可能其他线程操作了,从指向尾节点
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
// CAS 更新失败,则其他线程先一步修改了,指向下一个节点
else if (!p.casNext(null, s))
p = p.next;
else {
// 插入节点成功,p不是尾节点,则更新尾节点
if (p != t) {
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次数
int spins = -1;
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 匹配到了
if (item != e) { // matched
// 把s.item设置成s,便于GC
s.forgetContents();
return LinkedTransferQueue.<E>cast(item);
}
// 当前线程中断,或者超时了,则把s.item设置成s
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) {
// 删除s与前一节点的关系
unsplice(pred, s);
return e;
}
// 计算自旋次数
if (spins < 0) {
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
// 自旋大于0 ,则减1
else if (spins > 0) {
--spins;
// 随机让出CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
// 设置节点对应的等待线程为当前线程
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
// 计算是否超时,堵塞线程
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 堵塞线程
LockSupport.park(this);
}
}
}