BlockingQueue,什么鬼,刚开始接触到这个数据结构的时候,从字面意义上根本没看出这个的意思,怎么会有个block冠在前边,感觉是个不太好的词。但是又发现其在某些服务任务框架上经常使用。今天我们来探秘下这个数据结构的用法。
BlockingQueue源码分析
首先,打开JDK的源码,找到BlockingQueue这个数据结构。(android开发要安装的jdk 1.8,在其安装目录即有源码)。
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
*/
public interface BlockingQueue<E> extends Queue<E> {
首先,一个数据结构,包名是java.util.concurrent,很显然,这是java为并发操作设计的数据结构。并且还import了Collection和Queue。 所以其当然也有这两个数据结构的特性。而BlockingQueue是个interface接口类型,那这就不是一个真实的可以对象化的类。如同List 是ArrayList<>()的父类,我们初始化时经常用new ArrayList<>()创建对象并赋值给List<>类型的变量一样。他们都是泛型类,数据类型E可以在使用时具体制定。
接下来我们分析这个接口的主要方法。
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and throwing an
* {@code IllegalStateException} if no space is currently available.
* ……
* /
boolean add(E e);
从注释看出,add方法可以插入数据到队列中,如果队列满了,则抛出IllegalStateException异常。
之后的方法是offer(E e),其和add方法的不同之处在于,前者在队列满时是异常,后者是返回false,而add只是抛出异常。
而下一个方法,就可以看出BlockingQueue的精髓所在,即阻塞。
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;
put的精髓就在于,当队列满的时候,会阻塞住,等有空间时插入。
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;
反之, 队列的取元素也有阻塞方法,如果队列中有元素,则取出处理,否则队列为空时则阻塞等待。
这里只看了上述方法的注释,其他的都是些辅助方法。
由于BlockingQueue只是个接口,只有定义的方法,但是没有实际的实现,即如何实现阻塞,是在实现类中实现的。
其实现类有ArrayBlockingQueue、BlockingDeque等,详细的如下图所示。
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
我们分别分析ArrayBlockingQueue和BlockingDeque的源码。
ArrayBlockingQueue源码分析
这里不会分析所有的方法,只捡重要的变量和方法进行分析。
首先是类的声明和变量。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;
其中的items数组,即要添加的队列数据,有取数据的takeIndex和添加数据的putIndex。还有可重入锁lock。
其构造函数显示,队列的长度是外部传入的,即这个类的对象创建的时候,其大小就确定了。同时确定了lock为非公平锁。
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
……
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
然后分析其不会阻塞的的offer方法。首先获取lock的引用,并上锁,之后对数据进行操作,如果能正常插入数据,则返回true,否则返回false,最后再finally中unlock。
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
最主要我们要分析的方法是,这个BlockingQueue如何实现阻塞的。接下来查看put方法的实现。
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
这里的lockInterruptibly什么意思呢。我们在源码的注释中加入中文注释。
Acquires the lock unless the current thread is
* {@linkplain Thread#interrupt interrupted}.
* 如果当前线程被打断,则立即获取锁。
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*如果当前锁未被其他线程持有,则立即获取锁,并设置锁持有数为1
* <p>If the current thread already holds this lock then the hold count
* is incremented by one and the method returns immediately.
*如果当前线程已持有这个锁,则持有锁的数目加1,并立即返回。
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of two things happens:
*如果当前锁被另外线程持有,然后当前线程被线程调度为不可运行,则当前线程处于休眠状态,直到 1,锁被当前线程持有 2,其他线程打断了此线程。
* <ul>
*
* <li>The lock is acquired by the current thread; or
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts} the
* current thread.
*
* </ul>
*
* <p>If the lock is acquired by the current thread then the lock hold
* count is set to one.
* 如果当前线程获取锁,则将锁持有数设为1
* …其他注释省略
这里我们了解了这个锁的特性,然后代码
while (count == items.length)
notFull.await();
表示如果队列满了,则等待。一个锁对象可以有一个或多个相关的条件对象,用newCondition方法获取一个条件对象。notFull就是这样一个对象,可以回头看ArrayBlockingQueue的构造方法中,notFull = lock.newCondition();
**如果这个条件变量notFull调用了await()方法,则当前线程阻塞,并且释放锁。**这样,另外的线程就可以拿到锁,并取走队列中的数据,并通知当前线程有了剩余空间,线程被唤醒并添加数据到队列。究竟是不是这样呢,接下来我们找一个取数据的方法 take进行分析。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
从上面的代码看出,首先我们不分析取数据时数据为空导致的阻塞,其和数据满时的锁操作时一致的,我们还是假设刚才数据已满,获取锁,并且取出一个数据,则调用dequeue方法。取出锁之后,**notFull.signal();这个方法的意思是随机解除等待集中某个线程的阻塞状态。**和signal对应的方法是signalAll,意思是解除等待线程的阻塞,即通知所有等待线程,并通过竞争获得锁。 和signal,signalAll对应的有synchronized的notify和notifyAll的区别。可以想见,LinkedBlockingQueue也是类似原理实现阻塞,只不过不是array数组,而是单链表,且由于其可同时头部取数据,尾部添加数据,所以其数据结构中有两把锁,和两个条件变量。还有个原子类型的AtomicInteger类型的count,保证长度数据的原子性。这里不再详细分析,有需要的可以自己查看源码学习。
系统中有很多使用阻塞队列的例子,如系统Tts播放音频时的播放队列。
class AudioPlaybackHandler {
private static final String TAG = "TTS.AudioPlaybackHandler";
private static final boolean DBG = false;
private final LinkedBlockingQueue<PlaybackQueueItem> mQueue =
new LinkedBlockingQueue<PlaybackQueueItem>();
经过上边的分析我们已经知道了BlockingQueue的实现阻塞的原理。
BlockingDeque的源码分析
deque,双端队列,和queue的区别是。。。。看源码吧
Deque的源码注释是:一个线性组合,支持在两头插入或者移除。**deque是double end queue的简称,原来如此,就是队列queue只能first in last out,即头部出,尾部进。 但是deque即可以头部插入和移除,尾部也可以插入或移除。
/
- A linear collection that supports element insertion and removal at
- both ends. The name deque is short for “double ended queue”
- and is usually pronounced “deck”. Most {@code Deque}
- implementations place no fixed limits on the number of elements
- they may contain, but this interface supports capacity-restricted
- deques as well as those with no fixed size limit.
由于其也是个接口,deque的接口方法有
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
。。。。。
相当于queue有的方法,他都有两份,即first的操作和last的操作。废话少说,开始BlockingDeque的源码分析,由于
public interface BlockingDeque extends BlockingQueue, Deque {
其继承了BlockingQueue和Deque,所以其有这两个数据结构的共同特性。
而再查找BlockingDeque的实现类时,我在SDK中只找到一个实现类,即
LinkedBlockingDeque。刚开始还想不分析LinkedBlockingQueue,这里就来了 LinkedBlockingDeque。
首先其内部泛型类Node,即节点的声明。其中包含一个数据项item和prev方法和next方法。其他为LinkedBlockingDeque的内部变量和锁的声明。
static final class Node<E> {
/**
* The item, or null if this node has been removed.
*/
E item;
/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
Node<E> prev;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
Node<E> next;
Node(E x) {
item = x;
}
}
/**
* Pointer to first node.
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;
/**
* Pointer to last node.
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
既然是链表实现,那么其中方法也就是链表数据项的的添加和删除
/**
* Links node as first element, or returns false if full.
*/
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
//下边三步即是把新添加的node添加到队头,并且将first赋值为对头的node
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
/**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
//添加数据到队头,并获取锁
lock.lock();
try {
//如果队列满了,则返回false,就会发生阻塞,并释放锁。
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public E takeFirst() throws InterruptedException {
//从对头取走元素,并获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
//如果对头取出数据为空,则发生阻塞,否则,返回取出的元素
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
从上述源码可以看出,**如果由于队列满导致put数据阻塞,则会释放锁,然后等有takefirst或其他从对头取数据的方法调用后,会在unlinkFirst方法中调用notFull.signal(); 则通知上次putFirst的notFull.await()唤醒,在while方法中判断,插入数据到队头。**队尾操作也是相同的模式。
总结
使用阻塞队列,多线程操作共同的队列时不需要额外的同步。在经典的生产者-消费者模型中,队列会自动平衡负载,即任意一边(生产与消费)处理快了就会被阻塞掉,从而减少两边的处理速度差距,自动平衡负载这个特性就造成它能被用于多生产者队列,因为你生成多了(队列满了)你就要阻塞等着,直到消费者消费使队列不满你才可以继续生产。