1.ArrayBlockingQueue底层实现是通过数组来实现的,按照先进先出的原则,通过ReentrantLock lock 重入锁来锁住当前竞争资源,使用Condition notEmpty,Condition notFull来实现生产者-消费者模式(通知模式)。下面ArrayBlockingQueue添加和取出元素的源码可以看做生产者-消费者模式。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /**存放队列添加元素进来的容器,数组大小为创建ArrayBlockingQueue指定的容量*/ final Object[] items; /**记录移除元素的下标**/ int takeIndex; /**记录添加元素的下标**/ int putIndex; /**记录当前元素的个数*/ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * 创建ArrayBlockingQueue实例对象 * @param capacity 指定阻塞队列容量大小,指订后,不可改变 * @param fair 是否是公平锁,默认非公平锁 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * 添加一个元素(生产) * @param e * @throws InterruptedException */ public void put(E e) throws InterruptedException { checkNotNull(e);// 检查元素是否为null final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 锁住当前代码块 try { while (count == items.length) // 如果记录数等于队列容量的长度 当前线程阻塞 notFull.await(); insert(e); // 插入当前元素 } finally { lock.unlock(); // 释放锁 } } /** * 将当前元素插入到数组 * @param x */ private void insert(E x) { items[putIndex] = x; // 将元素添加到数组里 putIndex = inc(putIndex); // 将记录元素的下标+1 ++count; // 将记录当前元素个数+1 notEmpty.signal(); // 通知notEmpty等待的线程(通知消费者) } /** * 如果当前记录数等于容器的容量,则将记录数置为0,否则将当前元素加1 * @param i * @return */ final int inc(int i) { return (++i == items.length) ? 0 : i; } /** * 取走takeIndex下标的元素(消费者) 取走原则按照先进先出原则 * @return * @throws InterruptedException */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 尝试获得锁,如果得到该锁,锁住当前代码块 try { while (count == 0) notEmpty.await(); // 如果当前记录锁为0,该队列容量为0时,该线程阻塞,等待焕醒 return extract(); // 取走takeIndex下标的元素 } finally { lock.unlock(); // 释放锁 } } /** * 取走takeIndex下标的元素 * @return */ private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); // 获取当前下标为takeIndex的元素 items[takeIndex] = null; // 将原数组下标为takeIndex的元素置为null takeIndex = inc(takeIndex);// 如果当前takeIndex下标等于容器的容量,则将记录数置为0,否则将当前元素加1 --count; // 当前容器记记录数-1 notFull.signal(); //唤醒notFull等待的线程(生产则),也就是该put(E e)方法所等待的线程 return x; } /** * 将Object对象转为E类型 * @param item * @return */ @SuppressWarnings("unchecked") static <E> E cast(Object item) { return (E) item; }详细说明可以参考该博客: 点击打开链接
java并发-ReentrantLock的lock和lockInterruptibly的区别 点击打开链接