AbstractQueue简介: http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析: http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义: http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析: http://donald-draper.iteye.com/blog/2364007
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * A bounded {@linkplain BlockingQueue blocking queue} backed by an * array. This queue orders elements FIFO (first-in-first-out). The * [i]head[/i] of the queue is that element that has been on the * queue the longest time. The [i]tail[/i] of the queue is that * element that has been on the queue the shortest time. New elements * are inserted at the tail of the queue, and the queue retrieval * operations obtain elements at the head of the queue. * 基于数组的有界阻塞FIFO队列。head是待在队列中,最久的元素; tail则是最短的元素。新元素插入时放在队尾,消费时,则从head获取。 * <p>This is a classic "bounded buffer", in which a * fixed-sized array holds elements inserted by producers and * extracted by consumers. Once created, the capacity cannot be * changed. Attempts to {@code put} an element into a full queue * will result in the operation blocking; attempts to {@code take} an * element from an empty queue will similarly block. * 这一个是个节点的有界缓存队列,生产者生产消息,消费者消费消息。 队列创建后,容量不能被修改。尝试向一个已满队列生产元素,则会被阻塞, 尝试从一个空队列消费元素,同样会被阻塞。 * <p>This class supports an optional fairness policy for ordering * waiting producer and consumer threads. By default, this ordering * is not guaranteed. However, a queue constructed with fairness set * to {@code true} grants threads access in FIFO order. Fairness * generally decreases throughput but reduces variability and avoids * starvation. * ArrayBlockingQueue支持消费和生产线程等待条件时公平性与非公平性,默认为非公平锁。 这个我们可以通过构造公平性参数来设置。公平锁一般会降低吞吐量,但是可以减少不确定性, 避免锁饥饿情况的发生。 * <p>This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. * 实现了所有Collection和Iterator接口 * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * Serialization ID. This class relies on default serialization * even for the items array, which is default-serialized, even if * it is empty. Otherwise it could not be declared final, which is * necessary here. */ private static final long serialVersionUID = -817911632652898426L; //存放元素的数组 /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ //下一个take,poll,peek,remove元素的数组index int takeIndex; /** items index for next put, offer, or add */ //下一个put, offer, or add元素的数组index int putIndex; /** Number of elements in the queue */ //当前队列的元素数量 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ //可重入锁 /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes ,队列非空条件*/ private final Condition notEmpty; /** Condition for waiting puts 队列非满条件*/ private final Condition notFull; /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and default access policy. * 待容量参数的构造 * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * 待容量和公平性参数的构造 * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * Circularly increment i. */ final int inc(int i) { return (++i == items.length) ? 0 : i; } /** * Circularly decrement i. */ final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } @SuppressWarnings("unchecked") static <E> E cast(Object item) { return (E) item; } }
从上面来看,ArrayBlockingQueue是一个有界的线程安全FIFO队列,队列元素放在
一个元素数组中,一个takeIndex,表示下一个take,poll,peek,remove元素的数组index。
一个putIndex,表示下一个put, offer, or add元素的数组index,一个int Count,表示当前
队列元素数量,一把锁ReentrantLock用于访问控制,一个队列非空条件notEmpty,一个队列非满条件notFull,这两条件都是由ReentrantLock创建。
来看put操作:
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { //检查元素是否为null checkNotNull(e); final ReentrantLock lock = this.lock; //以可中断方式,获取锁 lock.lockInterruptibly(); try { while (count == items.length) //如果队列已满,等待notFull条件 notFull.await(); //否则插入元素 insert(e); } finally { lock.unlock(); } }
put操作首先检查元素是否为null,然后以可中断方式,获取锁,
如果队列已满,等待notFull条件,否则插入元素;
来看put操作的几个要点:
1.
//检查元素是否为null checkNotNull(e);
/** * Throws NullPointerException if argument is null. * * @param v the element */ private static void checkNotNull(Object v) { //为null,则抛出null指针异常 if (v == null) throw new NullPointerException(); }
2.
//否则插入元素 insert(e);
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void insert(E x) { items[putIndex] = x; //put索引自增1 putIndex = inc(putIndex); //容量自增 ++count; //唤醒等待消费的线程 notEmpty.signal(); }
来看offer操作
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
offer操作与put操作的区别,当队列满时,返回false;
再看超时offer:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } insert(e); return true; } finally { lock.unlock(); } }
超时offer与put操作没有太大的区别,区别是当队列满时,
超时等待notFull条件,插入成功,则返回true。
再看add操作:
public boolean add(E e) { return super.add(e); }
//AbstractQueue
public boolean add(E e) { //委托给offer if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
add操作实际上调用的offer操作。
再看take操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //以可中断方式获取锁 lock.lockInterruptibly(); try { while (count == 0) //当队列为空,等待非空条件 notEmpty.await(); //返回队列中takeIndex,索引对应的元素 return extract(); } finally { lock.unlock(); } } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E extract() { final Object[] items = this.items; //获取队列中takeIndex,索引对应的元素 E x = this.<E>cast(items[takeIndex]); //设置takeIndex索引对应的元素为null items[takeIndex] = null; //takeIndex索引自增 takeIndex = inc(takeIndex); //容量count自减 --count; //唤醒等待notFull条件的线程 notFull.signal(); return x; }
从上来看,take操作首先以可中断方式获取锁,当队列为空,等待非空条件,
否则返回队列中takeIndex,索引对应的元素,akeIndex索引自增,容量count自减,
唤醒等待notFull条件的线程。
再看超时poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return extract(); } finally { lock.unlock(); } }
超时poll的唯一区别是当当队列为空,超时等待非空条件。
再看poll
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); } finally { lock.unlock(); } }
poll操作与take操作的最大的区别为当队列为空,返回null。
再看peek操作:
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } }
peek操作首先获取锁,如果队列为空,则返回null,否则,返回takeIndex所对应的元素。
来看peek操作关键点itemAt,
/** * Returns item at index i. */ final E itemAt(int i) { return this.<E>cast(items[i]); }
再看remove操作:
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; //获取锁 lock.lock(); try { //遍历队列,找到元素相等,移除 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) { removeAt(i); return true; } } return false; } finally { lock.unlock(); } }
remove操作,首先获取锁,遍历队列,找到元素相等,移除。
contains操作:
public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) if (o.equals(items[i])) return true; return false; } finally { lock.unlock(); } }
clear操作:
/** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. */ public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //遍历队列,将队列清空 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) items[i] = null; count = 0; putIndex = 0; takeIndex = 0; //通知所有等待put的线程 notFull.signalAll(); } finally { lock.unlock(); } }
再来看drainTo
drainTo(Collection<? super E> c)和drainTo(Collection<? super E> c, int maxElements) 基本上相同,我们来看
drainTo(Collection<? super E> c, int maxElements)
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; int max = (maxElements < count) ? maxElements : count; //取出所有可用元素,添加集合 while (n < max) { c.add(this.<E>cast(items[i])); items[i] = null; i = inc(i); ++n; } if (n > 0) { count -= n; takeIndex = i; //唤醒所有等待put的线程 notFull.signalAll(); } //返回取出的元素数量 return n; } finally { lock.unlock(); } }
//获取队列中的元素数量
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
//获取队列剩余空间
public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } }
总结:
ArrayBlockingQueue是一个有界的线程安全FIFO队列,队列元素放在
一个元素数组中,一个takeIndex,表示下一个take,poll,peek,remove元素的数组index。
一个putIndex,表示下一个put, offer, or add元素的数组index,一个int Count,表示当前
队列元素数量,一把锁ReentrantLock用于访问控制,一个队列非空条件notEmpty,一个队列非满条件notFull,这两个条件都是由ReentrantLock创建。
put操作首先检查元素是否为null,然后以可中断方式,获取锁,
如果队列已满,等待notFull条件,否则插入元素;putIndex索引自增,容量count自增,唤醒等待消费的线程。offer操作与put操作的区别,当队列满时,返回false;超时offer与put操作没有太大的区别,区别是当队列满时超时等待notFull条件,插入成功,则返回true。
具体选择何中操作,视具体的场景需求。
take操作首先以可中断方式获取锁,当队列为空,等待非空条件,否则返回队列中takeIndex,索引对应的元素,akeIndex索引自增,容量count自减,唤醒等待notFull条件的线程。超时poll的唯一区别是当当队列为空,超时等待非空条件。poll操作与take操作的区别为当队列为空,返回null。具体选择何中操作,视具体的场景需求。
peek操作首先获取锁,如果队列为空,则返回null,否则,返回takeIndex所对应的元素。
remove操作,首先获取锁,遍历队列,找到元素相等,移除。
ArrayBlockingQueue与LinkedBlockingQueue区别是LinkedBlockingQueue中元素以节点链表来存储,而ArrayBlockingQueue是放在数组中;LinkedBlockingQueue中有两把锁分别为put和take锁,读写分离,而ArrayBlockingQueue只有一把锁控制take和put。