一 、简介
1.继承AbstractQueue父类,实现了BlockingQueue,SerialIzable接口
2.基于数组实现的线程安全的阻塞队列。
3.ArrayBlockingQueue通过Object[]数组来保存数据。初始容量的大小就是创建数组的大小
4.ReentranLock锁,ArrayBlockingQueue包含一个ReentranLock锁,为了保持在多线程中实现多线程对数组数据的互斥访问。
5.Condition:ArrayBlockingQueue包含两个Condition对象(notEmpty和notFull)。当线程取数据时,数组刚好为空,线程会执行notEmpty.await()进行等待,当插入数据时(offer,put)会执行notEmpty.singal唤醒notEmpty等待的线程。当插入数据时,数组刚好满了,线程会执行notFull.await()进行等待。当取出数据时(take,poll)会执行notFull.singal唤醒notFull等待的线程。
6.ArrayBlockingQueue数据结构:
二、ArrayBlockingQueue属性:
//存储队列的数组
final Object[] items;
//下个被取出元素的索引
int takeIndex;
//下个被添加元素的索引
int putIndex;
int count; //队列元素的个数
//可重入的独占锁
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
//迭代器
transient Itrs itrs = null;
三、方法
1.offer(E e)添加数据:数组满了之后添加会返回false
扫描二维码关注公众号,回复:
2612128 查看本文章
/**
* 1.offer在队列元素满了之后会return false
* 2.在队列里面添加数据,putindex+1,count+1,当队列满了之后putIndex置为0
* 3.并唤醒notEmpty条件队列的线程
*/
public boolean offer(E e) {
checkNotNull(e); //判断不为空,否则抛出异常
final ReentrantLock lock = this.lock; lock.lock(); //获取锁
try {
if (count == items.length) //判断队列中的元素是否满了
return false;
else {
enqueue(e); //在队列中添加元素
return true;
}
}
finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items; items[putIndex] = x; //在队列中添加数据
if (++putIndex == items.length)//putIndex加一,如果等于item.length,那么队列已满
putIndex = 0;
count++; //队列元素个数加一 notEmpty.signal(); //激活放入notEmpty条件队列中的线程 }
2.put(E e):与offer区别的是队列如果满了线程会执行notFull.await()等待。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取可被中断锁
try {
while (count == items.length) //如果队列已满,把当前线程放入notFull管理的条件队列中
notFull.await();
enqueue(e); //插入元素
} finally {
lock.unlock(); //释放锁
}
}
3.poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue(); //队列数据为0,返回null
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //取出要取出的元素,并设置为null,帮助gc回收
items[takeIndex] = null;
if (++takeIndex == items.length) //下个要取出元素的标记
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //唤醒对应的线程
return x;
}
4.remove(Object o)
public boolean remove(Object o) {
if (o == null) return false; //null值得判断
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) { //判断要移除的值
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
5. take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //当前锁若为终端状态则,抛出异常InterruptedException异常
try {
while (count == 0)
notEmpty.await(); //唤醒notEmpty等待线程
return dequeue(); //见上
} finally {
lock.unlock();
}
}