在并发编程中,有时候需要使用线程安全的队列。对于线程安全的队列,有两种实现方式:一种是阻塞(加锁),一种是非阻塞(无锁)。对于无锁化线程安全队列,实现要基于两个方面:原子性操作和内存访问控制。说的浅显一些,就是在JDK中,需要用到Unsafe类中的CAS操作,结合volatile关键字,来实现无锁化线程安全队列。具有代表性的就是ConcurrentLinkedQueue,当然还有其他的队列,诸如ConcurrentSkipListSet、ConcurrentSkipListMap。下面就着重说一下ConcurrentLinkedQueue的实现。
ConcurrentLinkedQueue是一个基于链表的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素的时候,它会返回头部的元素。这是采用“wait-free”算法(CAS)来实现的。
源码中的CAS操作和volatile关键字:
ConcurrentLinkedQueue的类图如下:
从类图中可以看出,ConcurrentLinkedQueue是由head节点和tail节点组成,每个节点(Node)由节点元素和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next连接在一起,从而形成一张链表结构的队列。
要了解ConcurrentLinkedQueue是无锁的线程安全队列,要从入队和出队来分析。
入队:
入队就是将入队的节点添加到队列的尾部,ConcurrentLinkedQueue的入队操作是方法offer(E e),该方法源码如下:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
从源码中我们可以看出,入队过程主要做两件事情:第一件事情就是定位出尾节点,第二件事情就是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。其中 casTail(t, newNode)方法就是CAS操作:
出队:
出队列就是从队列中返回一个节点元素,并且清空该节点对元素的引用。ConcurrentLinkedQueue的出队操作是方法poll(),该方法源码如下:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
从源码中可以看出,出队操作首先要获取头节点的元素,判断该元素是否为空,如果为空,则说明另外一个线程已经进行了一次出队操作并且取走该元素,如果不为空,则使用CAS的方式将头节点的引用设置为null,如果CAS成功,则返回头节点的元素,否则表示另外一个线程先于本线程做了CAS,导致该队列发生了变化,需要重新获取头节点。其中casItem(item, null)方法和updateHead(h, ((q = p.next) != null) ? q : p)都属于CAS操作。