最近一段时间在学习Java并发编程,研究了下AbstractQueuedSynchronizer、ConcurrentHashMap及本文涉及的ConcurrentLinkedQueue的实现。网上关于ConcurrentHashMap的源码分析很多,但关于ConcurrentLinkedQueue的源码分析较少。且随着时间推移,JDK对ConcurrentLinkedQueue的实现方式也进行了一些调整,这也无形中加大了ConcurrentLinkedQueue的分析难度。本文在参考《Java并发编程的艺术》一书中相关章节的基础上,针对JDK1.7中ConcurrentLinkedQueue的offer和poll方法进行分析。因本人能力有限,分析过程肯定会有谬误,希望大家能及时指出。
一、Node
Node是ConcurrentLinkedQueue定义的内部对象,其内部定义了item变量用来包裹实际入队元素及next变量用来保存当前节点的下一节点引用。且上述变量都被volatile关键字修饰,这意味着对item变量和next变量的读写都会被立刻刷入主存,可以被其他线程及时看到。Node还定义了其他一系列可以更改item及next变量的方法。这些方法底层都是通过CAS来实现,CAS会使用现代处理器上提供的高效机器级别的原子指令,也就是说这些方法涉及的操作都是原子操作。循环CAS更新volatile变量,这是JDK实现非阻塞类库的主要方式。以下是Node的实现:
private static class Node<E> {
volatile E item; volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}}
二、offer
我们先看看offer的实现。ConcurrentLinkedQueue维护了一个head节点,一个tail节点,分别用来指向队列的头部和尾部。我们假设目前队列中只有一个节点,此时head和tail节点都指向此节点。以下是offer过程中tail节点变化示意图:
tail节点变化示意图
以下是JDK中offer方法的实现:
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null */
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
我们假设以下分析在单线程环境下进行,入队操作步骤如下:
- 添加元素1到队列中,需要先构造一个包含入队元素的Node节点,并且Node节点的next引用为空。
- 进入循环体,将局部变量p和t都设置为tail。
- 判断tail的next节点q是否为空,因当前假设在单线程环境中,所以next节点必定为空,此时通过CAS操作将tail的next变量指向为新节点(单线程环境CAS操作必定成功),如上图中添加元素1步骤所示。
- 进入条件分支判断p != t条件是否成立,确定是否进行更新tail操作。此时p和t都是指向tail,因此p == t,将略过casTail操作,进入步骤5。
if (p != t)
// hop two nodes at a time
casTail(t, newNode); // Failure is OK.
- 返回true,节点添加成功,offer方法退出,入队操作成功。此处读者可能会有疑问:为什么tail还未更新就认为入队已经成功了?在下面的poll方法分析中我们将会看到:出队操作对队列遍历的循环终止条件不是当前节点是否为tail,而是当前节点的next引用是否为空。这就可以保证即使在tail未指向最后节点的情况下,依然可以获取到成功入队的所有节点(有什么副作用呢?)。
那何时更新tail呢?我们先考虑节点2入队的情况: - 参考tail变化示意图添加元素2的情况,此时的tail仍指向head节点,即p和t仍然指向tail。因为元素1已经入队,此时q不为空。
- 进入下一个条件分支判断p == q 是否成立。笔者看到此处的时候也是百思不得其解,q是p的下一个节点,怎么会指向自己呢?即使指向自己,为什么在tail未更新的情况下,要把p指向head节点呢?我们暂且忽略上述疑问,目前这种情况不会出现,进入步骤8。
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
- 进入最后一个分支,这一分支内的操作很简单:判断tail引用是否已经更改,若已经更改,则将p指向为最新的tail;否则,将p指向为q,也就是将p指向为节点1。
接着进入下一次循环,此时p的next引用再次为空,会重复1-5的步骤,节点2也成功添加到队列尾部。但要注意的是,步骤5中此时p不等于t条件成立,tail引用更改为会指向最后一个节点2。
下面我们考虑在多线程环境下的情况:
在上面的分析之后,多线程环境的分析就简单多了。多线程情况下所有操作与单线程情况下一致,但是要考虑多线程情况下CAS操作失败的情况。上述步骤中有两个CAS操作:
- 通过CAS更改next引用。此CAS操作失败说明已经有线程先入队成功,此时只需将p的引用更改为最新的tail或next节点,进行下一次循环,重复上述步骤直到成功为止
- 通过CAS操作将tail更新为新节点。此CAS操作失败说明在进行CAS操作之前,已经至少有一个线程进行了节点入队操作,并在入队成功后已经把tail节点更新。因此更新tail操作失败可以说明已经有其他节点做了此操作,可以忽略。
offer方法分析完毕,我们可以做出如下总结:tail不是时刻指向最后一个节点,至少间隔1个节点,才会更新一次tail。
三、poll
poll的实现思路与offer的实现类似,只不过把tail节点换成head节点。以下是poll过程中head节点变化示意图:
head节点变化示意图
以下是JDK中poll的实现:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
首先我们仍然先考虑单线程场景,假设队列中有4个节点,head节点变化示意图如上所示:
- 首先进入循环,定义局部变量p和h,皆指向head节点
- head节点的item为null,进入下一个条件分支,判断head节点的next节点是否为空。p节点的next节点不为空,进入步骤3。
- 判断 p == q 的是否成立,此处我们有同offer方法分析中步骤7一样的疑问,我们仍然先假设p不等于q,进入步骤4。
- 将p更改为head节点的next节点,进入下一次循环。
- 此时p节点指向节点1,并且节点1的item不为空,通过CAS操作将item设置为空。因我们假设方法是在单线程环境中执行,因此CAS操作总能成功。接下来是一个很关键的步骤:
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
此时h仍指向head并未更新,而p节点已经指向Node1,因此p不等于h的条件成立,接下来执行updateHead(h, p)操作。我们再来看看updateHead方法:
/**
* Try to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
此方法是通过CAS操作将head指向p,并且将h节点,即之前的head节点指向自己。相信读者应该还记得在分析offer方法中步骤7及上述步骤3中我们提出的两个疑问:
- q是p的next节点,为什么 p 会等于q呢?这个问题updateHead方法已经给出了解释。
- 为什么要在tail未更新的情况下,把p指向head节点呢?考虑下面的情况,如果入队元素较少导致tail节点更新较慢,同时出队操作较快导致head已经指向tail之后的节点。这种情况下需要将p要么指向最新的tail节点(若tail节点已经更改),要么指向head节点,才能不落后于队列。
另外,在阅读源码过程中,笔者主要参考了并发编程网中ConcurrentLinkedQueue的实现原理分析一文,但在笔者看来文章作者对succ方法的解读应该有误,原文中写到: “获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回head节点。” 下面给出的是succ方法实现:
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
从源码中可以判断p不可能为空,若p为空,那么当调用p.next的时候一定会抛空指针异常。另外大家可以看看ConcurrentLinkedQueue中所有调用succ方法的代码,都直接或间接隐含了p不为空的条件。所以p节点不可能为空,上述论断一定错误。那么什么时候会出现p == next情况呢?如果认真看了前面的分析,相信大家自己已经有了答案。
poll方法在多线程环境中执行情况和offer的类似,请读者自行分析。
四、总结
ConcurrentLinkedQueue是并发大师Doug Lea(如果看了jdk的concurrent包的源码,相信读者对此人不会陌生)根据Michael-Scott提出的非阻塞链接队列算法的基础上修改而来,它是一个基于链表的无界线程安全队列,它采用先入先出的规则对节点进行排序,当我们添加一个节点的时候,它会添加到队列的尾部;当我们获取一个元素的时,它会返回队列头部的元素。它通过使用head和tail引用延迟更改的方式,减少CAS操作,在满足线程安全的前提下,提高了队列的操作效率。
五、参考资料:
聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
《Java并发编程的艺术》
《Java并发编程实战》
ConcurrentLinkedQueue作为Doug Lea在JDK5.0推出的又一个并发容器,使用的是基于FIFO的队列结构,在队尾入队,在队首出队,但使用的基于CAS的”wait-free”进行的,也就是线程竞争锁失败后不会挂起,这在一定程度使得线程的等待时间减少,但CAS并不是一个算法,它是一个CPU直接支持的硬件指令,这也就在一定程度上决定了它的平台相关性
当前常用的多线程同步机制可以分为下面三种类型:
- volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。
- CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。
- 互斥锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。
注:上下文切换就是CPU在不同线程中分配时间片,在不同线程任务中切换
ConcurrentLinkedQueue 的非阻塞算法实现主要可概括为下面几点: