ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用FIFO的原则对元素进行排序添加的。
ArrayBlockingQueue为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。ArrayBlockingQueue支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了不平衡性。
ArrayBlockingQueue主要属性
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
private static final long serialVersionUID = -817911632652898426L;
final Object[] items; //一个定长数组,维护ArrayBlockingQueue的元素
int takeIndex; //队首位置
int putIndex; //队尾位置
int count;
// 重入锁
final ReentrantLock lock; //锁,ArrayBlockingQueue出列入列都必须获取该锁,两个步骤公用一个锁
// notEmpty condition
private final Condition notEmpty;//出列条件
// notFull condition
private final Condition notFull;//入列条件
transient ArrayBlockingQueue.Itrs itrs;
}
ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作。
ArrayBlockingQueue内部是采用数组进行数据存储的(属性items),为了保证线程安全,采用的是ReentrantLock lock,为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。而 notEmpty和notFull等中要属性在构造方法中进行创建:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
入队
ArrayBlockingQueue提供了诸多方法,可以将元素加入队列尾部。
1、add(E e) :将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException;
2、offer(E e) :将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false;
3、offer(E e, long timeout, TimeUnit unit) :将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间;
4、put(E e) :将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果当前队列已满,将线程移入到notFull等待队列中
while (count == items.length)
notFull.await();
//满足插入数据的要求,直接进行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
当队列已满时(count == items.length)将线程移入到notFull等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)插入数据元素。enqueue方法源码为:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//插入数据
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消费者线程,当前队列中有数据可供消费
notEmpty.signal();
}
enqueue方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。
出队
ArrayBlockingQueue提供的出队方法如下:
1、poll() :获取并移除此队列的头,如果此队列为空,则返回 null
2、poll(long timeout, TimeUnit unit) :获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)
3、remove(Object o) :从此队列中移除指定元素的单个实例(如果存在)
4、take() :获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)
poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
如果队列为空返回null,否则调用dequeue()获取列头元素:
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
该方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器itrs不为null,则需要维护下该迭代器。最后调用notFull.signal()唤醒入列线程。