AbstractQueue简介: http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析: http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义: http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析: http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析: http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析: http://donald-draper.iteye.com/blog/2364100
SynchronousQueue解析上-TransferStack: http://donald-draper.iteye.com/blog/2364622
SynchronousQueue解析下-TransferQueue: http://donald-draper.iteye.com/blog/2364842
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} of * <tt>Delayed</tt> elements, in which an element can only be taken * when its delay has expired. The [i]head[/i] of the queue is that * <tt>Delayed</tt> element whose delay expired furthest in the * past. If no delay has expired there is no head and <tt>poll</tt> * will return <tt>null</tt>. Expiration occurs when an element's * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less * than or equal to zero. Even though unexpired elements cannot be * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise * treated as normal elements. For example, the <tt>size</tt> method * returns the count of both expired and unexpired elements. * This queue does not permit null elements. * DelayQueue是一个延时元素无界非阻塞队列,在延时队列中,如果有一个延时时间耗尽, 则将会被消费take。队列的头部是延时时间即将过期或过期最早的元素。如果队列中没有 过期元素,那么poll操作将会返回null。判断一个元素是否过期的标准是,调用元素的 getDelay方法,如果返回的延时时间等于或小于零,即过期。未过期的元素不能被take或poll操作从队列中移除, 将被做为队列普通元素看待。size返回的队列中的过期与未过期元素。队列不允许为null元素的存在。 * <p>This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { //可重入锁 private transient final ReentrantLock lock = new ReentrantLock(); //优先级队列 private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. 这个线程用于等待队列头元素是否过期。本设计是 Leader-Follower模式的 变种,用为最小化必须要时间的等待。当一个线程成为leader时,则将会等待 下一个过期的元素,而其他线程等待是不确定性的。leader线程在take或poll操作 返回后,必须唤醒其他线程,除非其他线程在过渡期成为了leader。无论任何时候, 当一个更早过期的元素成为队头时,当前leader将会由于无效被设置为null,其他等待线程 将会被唤醒,而不是当前leader。 */ private Thread leader = null; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. 当一个队列头的元素可以用,或一个新线程成为leader,条件将会触发 */ private final Condition available = lock.newCondition(); /** * Creates a new <tt>DelayQueue</tt> that is initially empty. 构造为空队列 */ public DelayQueue() {} }
从上面可以看出DelayQueue内在一个优先级队列,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
再来看相关操作:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException {@inheritDoc} */ public void put(E e) { offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return <tt>true</tt> * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); }
add,put,超时offer操作都是委托给offer操作,来看offer操作:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { //委托给内部优先级队列的offer操作 q.offer(e); if (q.peek() == e) { //检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程 leader = null; available.signal(); } return true; } finally { lock.unlock(); }
}
从方法来看:
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;
入队列后,检查队头元素,如果队头元素为当前先添加的元素,
则设置leader为null,唤醒等待available条件的线程。
再来看take操作:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * 过期元素出队列,如果需要线程等待直到队列中有一个过期元素可以利用 * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //以可中断方式获取锁 lock.lockInterruptibly(); try { //自旋等待,直到有过期元素可以用 for (;;) { E first = q.peek();//获取队头元素 if (first == null) available.await();//如果队头为null,则等待available条件 else { //如果队头元素不为空,则获取元素延时时间 long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) //如果过期,则出队列 return q.poll(); else if (leader != null) //如果未过期,且leader不为null,则等待available条件 available.await(); else { //如果未过期,且leader为null,则选举当前线程为leader Thread thisThread = Thread.currentThread(); leader = thisThread; try { //以delay为超时时间,超时等待触发available条件 available.awaitNanos(delay); } finally { if (leader == thisThread) //当前线程成为leader,超时等待后,take成功,则重置leader为null leader = null; } } } } } finally { if (leader == null && q.peek() != null) //自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程 available.signal(); lock.unlock(); } }
从take方法,可以看出,首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;
自旋的过程为获取队头元素,如果队头为null,则等待available条件,
如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,
如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,
以元素的delay为超时时间,超时等待触发available条件,
超时时间过后触发available条件,最后判断当前线程是否为leader
如果当前线程成为leader,超时等待后,take成功,则重置leader为null;
在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。
再看超时poll:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or <tt>null</tt> if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else //这一点与take不同,take为等待,poll为超时等待 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null) //这一点与take不同,take为等待,poll为超时等待,如果nanos小于元素的延时时间,等待时间为超时等待时间nanos nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { //如果nanos大于元素的延时时间,在等待元素时间过期后,再等待nanos-delay+timeLeft(元素剩余等待时间) long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,
而超时poll为超时等待。
再看poll操作:
/** * Retrieves and removes the head of this queue, or returns <tt>null</tt> * if this queue has no elements with an expired delay. * * @return the head of this queue, or <tt>null</tt> if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); //如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
再看peek操作:
/** * Retrieves, but does not remove, the head of this queue, or * returns <tt>null</tt> if this queue is empty. Unlike * <tt>poll</tt>, if no expired elements are available in the queue, * this method returns the element that will expire next, * if one exists. * 查看队列头元素,不会移除元素,如果队列为空,返回null,如果没有元素过期, 则将会返回下一个即将过期的元素 * @return the head of this queue, or <tt>null</tt> if this * queue is empty. */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //委托给内部优先级队列,返回队头元素 return q.peek(); } finally { lock.unlock(); } }
再看remove操作:
/** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. 委托给内部优先级队列,无论元素是否过期,只要元素相等,则移除 */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } }
peek和remove操作直接委托给内部优先级队列。
drainTo操作:
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } }
drainTo操作是有peek和poll操作协作完成。
clear:
/** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. * Elements with an unexpired delay are not waited for; they are * simply discarded from the queue. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } }
remainingCapacity:
/** * Always returns <tt>Integer.MAX_VALUE</tt> because * a <tt>DelayQueue</tt> is not capacity constrained. * 为整数最大值 * @return <tt>Integer.MAX_VALUE</tt> */ public int remainingCapacity() { return Integer.MAX_VALUE; }
size:
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }
总结:
DelayQueue内在一个优先级队列PriorityQueue,用于存放元素,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;入队列后,检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程。add,put,超时offer操作都是委托给offer操作。
take操作首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;自旋的过程为获取队头元素,如果队头为null,则等待available条件,如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,以元素的delay为超时时间,超时等待触发available条件,超时时间过后触发available条件,最后判断当前线程是否为leader,如果当前线程成为leader,超时等待后,take成功,则重置leader为null;在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,而超时poll为超时等待。poll操作为如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素。
peek和remove,clear,size操作直接委托给内部优先级队列。drainTo操作是有peek和poll操作协作完成。
附:
//Delayed
package java.util.concurrent; import java.util.*; /** * A mix-in style interface for marking objects that should be * acted upon after a given delay. * Delayed是一个用于标记一个线程或动作在多少延时后,被执行的迷你接口。 * <p>An implementation of this interface must define a * <tt>compareTo</tt> method that provides an ordering consistent with * its <tt>getDelay</tt> method. * * @since 1.5 * @author Doug Lea */ public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
有了前面队列文章的分析,LinkedBlockingDeque就没有什么值得看的,贴出源码,一看定义,就知道什么意思了,LinkedBlockingDeque就不在说了。
//LinkedBlockingDeque
package java.util.concurrent; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on * linked nodes. * * <p> The optional capacity bound constructor argument serves as a * way to prevent excessive expansion. The capacity, if unspecified, * is equal to {@link Integer#MAX_VALUE}. Linked nodes are * dynamically created upon each insertion unless this would bring the * deque above capacity. * * <p>Most operations run in constant time (ignoring time spent * blocking). Exceptions include {@link #remove(Object) remove}, * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link * #removeLastOccurrence removeLastOccurrence}, {@link #contains * contains}, {@link #iterator iterator.remove()}, and the bulk * operations, all of which run in linear time. * * <p>This class and its iterator implement all of the * <em>optional</em> methods of the {@link Collection} and {@link * Iterator} interfaces. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.6 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. * * To implement weakly consistent iterators, it appears we need to * keep all Nodes GC-reachable from a predecessor dequeued Node. * That would cause two problems: * - allow a rogue Iterator to cause unbounded memory retention * - cause cross-generational linking of old Nodes to new Nodes if * a Node was tenured while live, which generational GCs have a * hard time dealing with, causing repeated major collections. * However, only non-deleted Nodes need to be reachable from * dequeued Nodes, and reachability does not necessarily have to * be of the kind understood by the GC. We use the trick of * linking a Node that has just been dequeued to itself. Such a * self-link implicitly means to jump to "first" (for next links) * or "last" (for prev links). */ /* * We have "diamond" multiple interface/abstract class inheritance * here, and that introduces ambiguities. Often we want the * BlockingDeque javadoc combined with the AbstractQueue * implementation, so a lot of method specs are duplicated here. */ private static final long serialVersionUID = -387911632671998426L; /** Doubly-linked list node class */ static final class Node<E> { /** * The item, or null if this node has been removed. */ E item; /** * One of: * - the real predecessor Node * - this Node, meaning the predecessor is tail * - null, meaning there is no predecessor */ Node<E> prev; /** * One of: * - the real successor Node * - this Node, meaning the successor is head * - null, meaning there is no successor */ Node<E> next; Node(E x) { item = x; } } /** * Pointer to first node. * Invariant: (first == null && last == null) || * (first.prev == null && first.item != null) */ transient Node<E> first; /** * Pointer to last node. * Invariant: (first == null && last == null) || * (last.next == null && last.item != null) */ transient Node<E> last; /** Number of items in the deque */ private transient int count; /** Maximum number of items in the deque */ private final int capacity; /** Main lock guarding all access */ final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); /** Condition for waiting puts */ private final Condition notFull = lock.newCondition(); /** * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } }
Exchanger为双向队列模式同步队列的,这个我们抽时间在研究:
我们把Exchanger的JavaDoc放在这里
package java.util.concurrent; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.LockSupport; /** * A synchronization point at which threads can pair and swap elements * within pairs. Each thread presents some object on entry to the * {@link #exchange exchange} method, matches with a partner thread, * and receives its partner's object on return. An Exchanger may be * viewed as a bidirectional form of a {@link SynchronousQueue}. * Exchangers may be useful in applications such as genetic algorithms * and pipeline designs. * * <p><b>Sample Usage:</b> * Here are the highlights of a class that uses an {@code Exchanger} * to swap buffers between threads so that the thread filling the * buffer gets a freshly emptied one when it needs it, handing off the * filled one to the thread emptying the buffer. * <pre>{@code * class FillAndEmpty { * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); * DataBuffer initialEmptyBuffer = ... a made-up type * DataBuffer initialFullBuffer = ... * * class FillingLoop implements Runnable { * public void run() { * DataBuffer currentBuffer = initialEmptyBuffer; * try { * while (currentBuffer != null) { * addToBuffer(currentBuffer); * if (currentBuffer.isFull()) * currentBuffer = exchanger.exchange(currentBuffer); * } * } catch (InterruptedException ex) { ... handle ... } * } * } * * class EmptyingLoop implements Runnable { * public void run() { * DataBuffer currentBuffer = initialFullBuffer; * try { * while (currentBuffer != null) { * takeFromBuffer(currentBuffer); * if (currentBuffer.isEmpty()) * currentBuffer = exchanger.exchange(currentBuffer); * } * } catch (InterruptedException ex) { ... handle ...} * } * } * * void start() { * new Thread(new FillingLoop()).start(); * new Thread(new EmptyingLoop()).start(); * } * } * }</pre> * * <p>Memory consistency effects: For each pair of threads that * successfully exchange objects via an {@code Exchanger}, actions * prior to the {@code exchange()} in each thread * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * those subsequent to a return from the corresponding {@code exchange()} * in the other thread. * * @since 1.5 * @author Doug Lea and Bill Scherer and Michael Scott * @param <V> The type of objects that may be exchanged */ public class Exchanger<V>