BlockingQueue适合作为数据共享的通道,可以让服务线程在队列为空时进行等待,当有新的消息进入队列后,自动将线程唤醒;
BlockingQueue是一个接口,其实现如下图:
ArrayBlockingQueue基于数组实现,适合做有界队列;LinkedBlockingQueu基于链表实现,适合做无界队列;
以ArrayBlockingQueue为例,看下源码是如何实现的;
内部元素保存在一个对象数组中
/** The queued items */
final Object[] items;
向队列中压入元素可以使用offer()和put()方法;offer()方法在队列满了时会立即返回false,队列没满时正常入队;put()方法在队列满时会一直等待,直到队列中有空闲的位置;
从队列中弹出元素可以使用poll()和take()方法;opll()方法在队列为空时直接返回null,但take()方法会等待,直到队列内有可用元素;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
当执行take()方法时,如果队列为空,当前线程在notEmpty上等待(notEmpty.await();),新元素入队即执行enqueue()方法时,则该线程会得到一个通知(notEmpty.signal(););
同理,当执行put()方法时,如果队列已满,压入线程在notFull上等待(notFull.await(););当有元素从队列中被拿走即执行dequeue()方法时,则压入线程得到一个通知(notFull.signal(););
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}