ArrayBlockingQueue源码解析
一、概述
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。
ArrayBlockingQueue的数据结构:
上图涉及:ReentrientLock
、Condition
、BlockingQueue
的相关知识。
本文涉及的知识点:
- 使用数组实现的List:ArrayList 源码分析
- 队列的相关知识:Queue 综述
- 阻塞队列的概念:并发容器(一) — 综述
- 阻塞功能的实现涉及Condition接口:Condition接口
- 并发安全涉及可重入锁ReentrientLock:ReentrantLock源码解析
LinkedBlockingQueue 队列
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列。关于链表的实现方法可以自己分析一下。
关于链表的相关操作,可以参考链表实现的List:LinkedList 源码分析
二、源码解析
普通队列包含入队和出队两种操作,阻塞队列在普通队列原有的基础上实现了阻塞功能。
阻塞队列 (并发容器(一) — 综述 ) 提供了4套API 来操作队列,本文我们只分析其中的两套操作,阻塞的入队/出队,阻塞且有超时响应的入队/出队。
下面将从以下几个方面进行分析:
- 构造函数:
ArrayBlockingQueue()
- 队列实际入队、出队的操作:
enqueue(x)
、dequeue()
- 队列具有阻塞功能的入队、出队操作:
put(e)
、take()
- 队列具有超时且非阻塞的入队、出队操作:
offer(e, timeout, unit)
、poll(timeout, unit)
1. 构造函数
// ArrayBlockingQueue.class
final ReentrantLock lock; //可重入锁,在添加、移除元素时加锁保证安全性
private final Condition notEmpty; //锁的非空条件
private final Condition notFull; //锁的满条件
final Object[] items; //实际添加元素的容器
int takeIndex; //获取元素时的位置(队列头)
int putIndex; //添加元素时的位置(队列尾)
int count; //队列中元素的个数
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2. 实际入队、出队的操作
实际入队、出队的操作:enqueue(e)
、dequeue()
// ArrayBlockingQueue.class
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) { //入队操作
final Object[] items = this.items; //获取存储元素的数组
items[putIndex] = x; //将添加的元素加入队列
//这里判断putIndex的下一个位置是否跟数组长度相同,若相同,则说明putIndex已经是数组的最后一个位置。
if (++putIndex == items.length)
// 这里将下一个添加位置设置成数组第一个位置。
putIndex = 0; //由这两步操作可知,ArrayBlockingQueue是一个循环队列。
count++; //将队列中添加的元素个数增加1
notEmpty.signal(); //这里很重要,唤醒获取数据时被阻塞的线程继续获取数据。
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //取出takeIndex位置的元素(takeIndex代表队列头部元素的位置)
items[takeIndex] = null; //将数组中takeIndex位置数据置空
if (++takeIndex == items.length) //这里跟入队的逻辑是一样的,请参考入队操作。
takeIndex = 0;
count--; //出队后,队列元素个数减1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //这里很重要,唤醒添加数据时被阻塞的线程继续添加数据。
return x;
}
小结:
ArrayBlockingQueue
是一个循环队列。notFull.signal()
唤醒添加数据的线程。notEmpty.signal()
唤醒获取数据的线程。
3. 有阻塞功能的入队、出队操作
有阻塞功能的入队、出队操作:put(e)
、take()
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加锁(可被中断的)
try {
while (count == items.length) //当队列满了的时候
/*
* 使添加数据的线程进入notFull的等待队列。
* 注:notFull是一个Condition对象,Condition内维护了一个等待队列。
*/
notFull.await();
enqueue(e); //执行到这里,说明队列不满,执行元素入队操作。
} finally {
lock.unlock(); //释放锁
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加锁(可被中断的)
try {
while (count == 0) //当队列为空时,无法再提供数据
/*
* 使获取数据的线程进入notEmpty的等待队列。
* 注:notEmpty是一个Condition对象,Condition内维护了一个等待队列。
*/
notEmpty.await();
return dequeue(); //执行到这里,说明当前队列不为空,执行元素出队操作。
} finally {
lock.unlock(); //释放锁
}
}
小结:
- 使用锁的过程要遵循
before...after
模式。 - Condition 对象内部维护了一个等待队列。
- 获取的锁是会响应中断操作的。
4. 阻塞且有超时功能的入队、出队操作
有超时且非阻塞的入队、出队操作:offer(e, timeout, unit)
、poll(timeout, unit)
/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout); //将timeout转换成纳秒
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取锁
try {
while (count == items.length) { //判断队列是否满了
if (nanos <= 0) //时间<=0表示已经超时了
return false;
//每次调用都会返回一个时间,这个时间<=0表示等待的实际已经到了。
nanos = notFull.awaitNanos(nanos);
}
enqueue(e); //执行到这里,表示没有超时,将数据加入队列
return true;
} finally {
lock.unlock(); //释放锁
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout); //将timeout转换成纳秒
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取锁
try {
while (count == 0) { //判断队列是否空了
if (nanos <= 0)
return null;
//每次调用都会返回一个时间,这个时间<=0表示等待的实际已经到了。
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue(); //执行到这里,表示没有超时,获取队列头数据
} finally {
lock.unlock(); //释放锁
}
}
三、小结
ArrayBlockingQueue
是一个循环队列。- 只有
put(e)
、take()
才能实现阻塞功能,其他方法都不具备阻塞功能。 notFull.signal()
唤醒的是添加数据的线程。notEmpty.signal()
唤醒的是获取数据的线程。- 使用锁的过程要遵循
before...after
模式。 Condition
对象内部维护了一个等待队列,可以实现阻塞功能。