什么是生成消费者模式
生成者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生成者生产数据之后不用等待消费者来处理,直接添加到阻塞队列中,消费者也不找生成者要数据,而是直接从阻塞队列里取出,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
使用生产者和消费者模式的优势
在线程世界里,生产者就是生成数据的线程,消费者就是消费数据的线程。在多线程中,如果生产者生成数据很快,而消费者消费数据过慢,那么生产者就必须等待消费者处理完,才能继续生成数据。同样的,如果消费者的处理能力大于消费者,那么消费者就必须等待生成者。而生产者和消费者模式就是解决生产消费不均衡的问题。这就类似商品经济中供与求的关系,两者之间如果正常运行,必须保持两者间的平衡点,否则会带来不可预测的风险。
生产者消费者的实现
一、wait()和notify()和非阻塞队列
public class MyTest { private LinkedList<Double> mQueue = new LinkedList(); private int maxSize = 5;// 允许容器最大容量 private int count = 10;// 模拟生产和消费的次数 public static void main(String[] args) { MyTest test = new MyTest(); Producer producer = test.new Producer(); producer.start(); Consumer consumer = test.new Consumer(); consumer.start(); } /** * 生产者线程 */ class Producer extends Thread { @Override public void run() { super.run(); for (int i = 0; i < count; i++) { // 同步代码块 锁对象是mLinkedList synchronized (mQueue) { // 当条件是当容器中的长度等于maxSize时,线程进入阻塞状态 if (mQueue.size() == maxSize) { // 线程进入阻塞状态 try { System.out.println("容器已满,等待消费者消费数据"); mQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 向集合中添加数据,即生产数据 double random = Math.random(); mQueue.offer(random); System.out.println("生产者:" + random); mQueue.notify(); } } } } /** * 消费者线程 */ class Consumer extends Thread { @Override public void run() { super.run(); for (int i = 0; i < count; i++) { synchronized (mQueue) { // 当容器中没有元素时,线程进入阻塞状态 if (mQueue.size() == 0) { try { System.out.println("容器为空,等待生产者生产数据"); mQueue.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // 从容器中取出第一个元素 double first = mQueue.poll(); System.out.println("消费者:" + first); mQueue.notify(); } } } } }
部分输出结果:
容器为空,等待生产者生产数据 生产者:0.7484302872683565 生产者:0.8967336201169599 生产者:0.2390860498316557 生产者:0.7094451240044176 生产者:0.22839940734934594 容器已满,等待消费者消费数据 消费者:0.7484302872683565 消费者:0.8967336201169599 生产者:0.0724122734493754 生产者:0.6127125155009396
二、可重入锁(ReentrantLock)和条件对象(Condition)和非阻塞队列
public class MyTest2 { private LinkedList<Double> mQueue = new LinkedList(); private int maxSize = 5;// 允许容器最大容量 private int count = 10;// 模拟生产和消费的次数 private ReentrantLock mLock = new ReentrantLock(); private Condition condition = mLock.newCondition(); public static void main(String[] args) { MyTest2 test = new MyTest2(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } /** * 生产者线程 */ class Producer extends Thread { @Override public void run() { super.run(); for (int i = 0; i < count; i++) { //获取锁 mLock.lock(); try { if (mQueue.size() == maxSize) { System.out.println("容器已满,等待消费者消费数据"); condition.await(); } double random = Math.random(); mQueue.offer(random); System.out.println("生产者:" + random); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 mLock.unlock(); } } } } /** * 消费者线程 */ class Consumer extends Thread { @Override public void run() { super.run(); for (int i = 0; i < count; i++) { //获取锁 mLock.lock(); try { if (mQueue.size() == 0) { System.out.println("容器为空,等待生产者生产数据"); condition.await(); } double first = mQueue.poll(); System.out.println("消费者:" + first); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 mLock.unlock(); } } } } }
部分输出结果:
容器为空,等待生产者生产数据 生产者:0.4867863135542152 生产者:0.0797739480646551 生产者:0.36641998393453745 生产者:0.6879099341543086 生产者:0.03090795334598362 容器已满,等待消费者消费数据 消费者:0.4867863135542152 消费者:0.0797739480646551 消费者:0.36641998393453745 消费者:0.6879099341543086 消费者:0.03090795334598362
三、BlockingQueue阻塞队列
它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
public class MyTest3 { private ArrayBlockingQueue<Double> mQueue = new ArrayBlockingQueue<Double>(10); public static void main(String[] args) { MyTest3 test2 = new MyTest3(); Producer producer = test2.new Producer(); Consumer consumer = test2.new Consumer(); producer.start(); consumer.start(); } class Producer extends Thread { @Override public void run() { super.run(); for (int i = 0; i < 10; i++) { double random = Math.random(); System.out.println("生产者:" + random); try { mQueue.put(random); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread { @Override public void run() { super.run(); for (int i = 0; i < 10; i++) { try { double take = mQueue.take(); System.out.println("消费者:" + take); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
部分输出结果:
生产者:0.37120549034520134 生产者:0.42260856816245274 消费者:0.37120549034520134 生产者:0.04279189186714871 消费者:0.42260856816245274 生产者:0.8863755762544362 消费者:0.04279189186714871 生产者:0.2615784453671385 生产者:0.9533695392214557 消费者:0.8863755762544362
使用阻塞队列实现生产消费模式无须单独考虑同步和线程间通信的问题,实现起来相对前两种更简单,性能也更好。
一般常用的用于实现生产消费模式的阻塞队列有两种分别是:ArrayBlockingQueue、LinkedBlockingQueue
ArrayBlockingQueue:是用数组实现的有界阻塞队列
按照先进先出(FIFO)的原则对元素进行排序的。默认情况下是非公平线程队列。公平访问队列就是按照阻塞的先后顺序访问队列。即先阻塞的生产者线程,可以先向队列里插入元素;先阻塞的消费者线程,可以先从队列里获取元素。使用公平线程队列会降低性能,需要维护一个有序队列。创建一个公平性的阻塞队列,如下:
ArrayBlockingQueue<Double> mQueue = new ArrayBlockingQueue<Double>(5,true);
LinkedBlockingQueue :是链表结构实现的有界阻塞队列
也是按照先进先出(FIFO)的原则对元素进行排序的,其内部也维持着一个数据缓存队列(该队列是一个链表结构)。当生产者向队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓存区达到缓存容量的最大值时(可以通过构造函数设置最大值),才会阻塞生产队列,直到消费者从队列中消费掉一份数据,生产者线程才会带唤醒。反之,对于消费者线程处理也是一样的道理。