介绍
一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
相比于其他阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first结尾的方法,表示插入、获取获移除双端队列的第一个元素。以last结尾的方法,表示插入、获取获移除双端队列的最后一个元素。
LinkedBlockingDeque是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。
数据结构
链表节点
static final class Node<E> {
/**
* null,表示节点被移除.
*/
E item;
/**
* One of:
* - 真实后置节点
* - 本节点,表示前置节点是tail
* - null,没有前置节点
*/
Node<E> prev;
/**
* One of:
* - 真实后置节点。
* - 本节点,表示后置节点是head。
* - null, 没有后置
*/
Node<E> next;
Node(E x) {
item = x;
}
}
属性
/**
* Pointer to first node.
* 恒定式: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;
/**
* Pointer to last node.
* 恒等式: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
方法实现
offer,poll,peek
Queue实现
public boolean add(E e) {
addLast(e);
return true;
}
public E remove() {
return removeFirst();
}
public E element() {
return getFirst();
}
public boolean offer(E e) {
return offerLast(e);
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}
public E poll() {
return pollFirst();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}
public E peek() {
return peekFirst();
}
Queue接口的实现都是调用Deque接口的实现。
入队列默认在Last方向(即尾端)处理。出队列默认在First方向(即Head)处理。
Deque实现
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);
} finally {
lock.unlock();
}
}
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkFirst(node)) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (!linkLast(node)) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkFirst()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
while ( (x = unlinkLast()) == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
public E peekFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (first == null) ? null : first.item;
} finally {
lock.unlock();
}
}
public E peekLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (last == null) ? null : last.item;
} finally {
lock.unlock();
}
}
put,take
Queue实现
public void put(E e) throws InterruptedException {
putLast(e);
}
public E take() throws InterruptedException {
return takeFirst();
}
Queue接口的实现都是调用Deque接口的实现。
入队列默认在Last方向(即尾端)处理。出队列默认在First方向(即Head)处理。
Deque实现
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
remove
public boolean removeFirstOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = first; p != null; p = p.next) {
if (o.equals(p.item)) {
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
public boolean removeLastOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = last; p != null; p = p.prev) {
if (o.equals(p.item)) {
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
pop,push
public void push(E e) {
addFirst(e);
}
/**
* @throws NoSuchElementException {@inheritDoc}
*/
public E pop() {
return removeFirst();
}
unlinkFirst,unlinkLast,linkFirst,linkLast
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null; //表示节点移除。
f.next = f; // help GC
first = n;
//没有节点
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
/**
* Removes and returns last element, or null if empty.
*/
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
//节点加入到head。
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
//
notEmpty.signal();
return true;
}
/**
* Links node as last element, or returns false if full.
*/
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
void unlink(Node<E> x) {
// assert lock.isHeldByCurrentThread();
Node<E> p = x.prev;
Node<E> n = x.next;
if (p == null) {
unlinkFirst();
} else if (n == null) {
unlinkLast();
} else {
p.next = n;
n.prev = p;
x.item = null;
// Don't mess with x's links. They may still be in use by
// an iterator.
--count;
notFull.signal();
}
}