BlockingQueue是一个阻塞队列,在高并发场景应用的非常广泛,最常见的情景应该就是在线程池中,其他的比如在ZooKeeper源码中,dubbo源码中也大量使用了阻塞队列,我们经常使用的也就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,ConcurrentLinkedQueue这几种。
ArrayBlockingQueue
ArrayBlockingQueue是一个有界队列,它本质是一个由数组组成的FIFIO的阻塞队列。它的大小是固定不可变的。新添加的元素会放入队列的尾部,获取元素的时候是从队列的头部开始遍历。当队列满了的时候,继续向队列添加元素,队列会阻塞,直到队列中有元素被取出才能继续添加。当队列为空的时候,向队列获取元素的时候,队列阻塞直到队列中添加进来了元素为止。注意,不能往队列中添加null元素。ArrayBlockingQueue使用ReentrantLock作为锁,它可以指定是否使用公平锁。默认是为false,访问顺序不确定,如果设为true,就按照FIFO的顺序访问,但这样通常会降低吞吐量。
构造方法
/** 存放元素的object数组 */ final Object[] items; /** take, poll, peek or remove的下一个索引 */ int takeIndex; /** put, offer, or add的下一个索引 */ int putIndex; /** 队列中元素的数量 */ int count; /** 可重入锁 */ final ReentrantLock lock; /** 队列不为空的condition队列 */ private final Condition notEmpty; /** 队列不满的condition队列 */ private final Condition notFull; // 默认的构造方法,设为队列的初始容量,this调用下面的构造方法 public ArrayBlockingQueue(int capacity) { this(capacity, false); } // fair为true or false,代表是否开始公平锁 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) // 队列容量必须 >0,否则直接抛出异常 throw new IllegalArgumentException(); this.items = new Object[capacity]; // 构造一个初始容量大小的数组 lock = new ReentrantLock(fair); // 创建ReentrantLock,并且指定是否开启公平锁 notEmpty = lock.newCondition(); // 创建两个condition队列 notFull = lock.newCondition(); } // 传入一个集合队列 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 加锁 try { int i = 0; try { for (E e : c) { // 遍历集合,往数组中添加元素 checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { // 数组越界,直接抛异常 throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; // 集合为空,直接把putIndex设为0,否则,直接把putIndex设为count } finally { lock.unlock(); // 一定要释放锁 } }
入队
1.1 add
// 添加一个元素,其实调用了offer方法进行判断,如果队列满了直接抛出异常,添加成功直接返回true public boolean add(E e) { return super.add(e); } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
1.2 offer
public boolean offer(E e) { checkNotNull(e); // 元素不能为null final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) // 如果队列满了,直接返回false,表示添加失败 return false; else { // 队列未满,直接把元素插入到队列,并且直接返回true enqueue(e); return true; } } finally { lock.unlock(); } }
1.3 put
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 线程被中断后直接抛出异常,响应锁中断的方法,所以put必须要try...catch try { while (count == items.length) // 队列满了的时候,线程要释放锁,一直等待唤醒信号 notFull.await(); enqueue(e); // 等待唤醒信号竞争到锁之后,说明队列已经不是已满的状态,再进行入队 } finally { lock.unlock(); } }
1.4 offer
/** * * 在指定时间内入队 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); // 将时间转化为纳秒 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 指定时间的入队使用了响应锁中断 try { while (count == items.length) { // 队列满了,如果nanos <= 0,已超时,直接返回false if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); // 进行等待,在此过程中线程可能被唤醒 } enqueue(e); // 线程获取唤醒信号后竞争到锁,直接返回true return true; } finally { lock.unlock(); } }
上面的方法都一致的调用了enqueue方法,我们来看下入队的具体操作。
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // 将元素放入putIndexu索引处 if (++putIndex == items.length) // 如果putIndex索引+1 == 队列的容量,直接把putIndex索引设为0 putIndex = 0; count++; // 元素个数自增 notEmpty.signal(); // 唤醒notFull队列上等待的线程 }
出队
2.1 poll
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果队列为空直接返回null,否则调用dequeue方法 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
2.2 take
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // take方法使用的是响应锁中断,线程被中断直接抛异常 try { while (count == 0) // 当队列为空时线程释放锁,一致等待唤醒信号 notEmpty.await(); return dequeue(); // 出队操作 } finally { lock.unlock(); } }
2.3 poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 指定时间的出队使用了响应锁中断 try { while (count == 0) { // 当队列为空时,如果nanos <=0,已超时或者传入的参数<0,直接返回null if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); // 在nanos时间内等待唤醒信号 } return dequeue(); // 队列不为空或者在nanos时间内队列添加进元素线程通过唤醒信号竞争到锁直接出队操作 } finally { lock.unlock(); } }
看一下dequeue的操作
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // takeIndex初始化为0,所以第一次获取元素的时候,总是返回数组的第一个元素,正好是FIFO items[takeIndex] = null; // 把takeIndex位置上的元素置为空,便于GC if (++takeIndex == items.length) // 如果takeIndex+1 == 队列容量,说明队列满了,直接把takeIndex置为0 takeIndex = 0; count--; // 队列元素个数自减 if (itrs != null) // jdk 1.8 新增加的特性 迭代器 itrs.elementDequeued(); notFull.signal(); // 能够取出元素,说明队列不为空,唤醒notEmpty队列上等待的线程 return x; // 直接返回takeIndex索引上的元素 }
2.4 peek
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { // 队列为空直接返回null,否则调用itemAt方法返回当前队列takeIndex上的元素,即当前队列最前面的元素 return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } } @SuppressWarnings("unchecked") static <E> E cast(Object item) { return (E) item; } /** * 返回takeIndex上的元素 */ final E itemAt(int i) { return this.<E>cast(items[i]); }
删除
/** * 在队列中删除一个给定的元素 * 如果要删除的元素为null,直接返回false * 如果count <= 0,直接返回false * 当count>0,从takeIndex开始调用equal方法比较遍历,当元素相同的时候,调用removeat方法删除元素,返回true * ++i == 队列容量,直接把i置为0,如果i=putIndex,说明在队列中未查找到要删除的元素,直接返回false */ public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
总结
我们最常用的也就是它的入队和出队方法。
入队:add 队列未满添加元素返回true,队列满了直接抛出异常
put 队列满了一直阻塞直到队列有空间才进行添加元素
offer 队列未满直接添加元素返回true,队列满了直接返回false
出队:poll 队列为空直接返回null,队列不为空直接返回队首元素
take 队列为空一直等待直到队列中有元素,直接返回队首元素
ArrayBlockingQueue使用ReentrantLock来保证并发时的线程安全,它是基于数组和两个condition队列来实现的。总体来说,它的实现思想还是比较简单的,应该是BlockingQueue中最好理解的一个类。