队列
队列是一种先入先出的数据结构,新加入的元素都是加入到队列的后面
阻塞队列
java.util.concurrent.BlockingQueue<E>,是一种支持阻塞的插入元素,阻塞的移除元素操作的队列。
阻塞的插入:当队列空间满了,阻塞线程继续向队列中添加元素
阻塞的移除:当队列空间空了,阻塞线程从队列中取出元素
从这里看出,阻塞队列可以用在生产者/消费者模型中,生产者是向队列中添加元素的线程,消费者是从队列中取出元素的线程
*队列的操作方法
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll | take | poll(time,unit) |
检查方法 | element() | peek() | / | / |
抛出异常:指的是队列满了或者队列为空,就抛出异常
返回特殊值:指的是插入方法返回的是true/false,移除方法返回的是元素
一直阻塞:指的是如果队列满了,再调用方法向队列中添加,生产者线程会一直阻塞,队列空了,再从队列中取元素,消费者线程会一直阻塞
超时退出:同样是对于上面的情况,来说,不过是超出了时间,线程就会退出
* 如果是无界队列,插入的put,offer一定不会阻塞,而且offer返回true
阻塞队列BlockQueue的实现类
1 . ArrayBlockingQueue
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
* 创建时候,必须指定容量大小/可以指定公平性,内部以数组进行存放
* 只有一个重入锁,意味着,生产者消费者调用时候,一个阻塞,一个执行,无法并行执行
2 . LinkedBlockingDeque
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * Linked list node class */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0); /** * Head of linked list. * Invariant: head.item == null */ private transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
* LinkedBlockingQueue的创建可以指定容量大小,默认是Integer.MAX_VALUE,可以认为是无界的
* 存放元素的结构,是以链表存放
* 生产者和消费者线程,持有的ReentrantLock是不同的,意味着生产者线程和消费者线程可以并行执行
,通过检查count,完成阻塞消费者或者生产者的动作
3.SynchronousQueue
和前面两种队列实现不同的,这个队列本身不会去存放元素,每一个put操作,必须等待另一个take操作,否则就不能继续添加元素
/** * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. */ public SynchronousQueue() { this(false); } /** * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); }
* 可以指定线程访问的策略,如果是true,按照先进先出来访问队列,否则就是非公平的,默认是非公平的
等待通知机制,自定义一个阻塞队列
使用数组实现一个阻塞队列 https://blog.csdn.net/ditto_zhou/article/details/77330733
使用阻塞队列来实现生产者消费模型
package com.ftf.thread.lock; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class ProductCustomerDemo { private static AtomicInteger ai = new AtomicInteger(); public static void main(String[] args) { final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10); Thread product = new Thread(new Runnable() { @Override public void run() { while(true){ try { int a = ai.incrementAndGet(); System.out.println("生产者生产数据:"+a); abq.put(a); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); Thread customer = new Thread(new Runnable() { @Override public void run() { while(true){ try { int a = abq.take(); System.out.println("消费者取数据"+a); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); product.start(); customer.start(); } }
当达到容量上限制后,生产者停止生产数据,直到消费者消费,容量减少,继续生产,在非单个生产者-消费者模型中,不用担心出现线程假死的现象。