生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。
解决生产者/消费者问题的方法可分为两类:
- 采用某种机制保护生产者和消费者之间的同步;
- 在生产者和消费者之间建立一个管道。
第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
- wait() / notify()方法
- await() / signal()方法
- BlockingQueue阻塞队列方法
- PipedInputStream / PipedOutputStream
本文只介绍由LinkedBlockingQueue实现的生产者和消费者模型。
先了解一下阻塞队列
阻塞队列概要
阻塞队列与我们平常接触的普通队列(LinkedList或ArrayList等)的最大不同点,在于阻塞队列支出阻塞添加和阻塞删除方法。
-
阻塞添加
所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。 -
阻塞删除
阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)
Java中的阻塞队列接口BlockingQueue继承自Queue接口,下面我们将分析阻塞队列中的两个实现类ArrayBlockingQueue和LinkedBlockingQueue的简单使用和实现原理
ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put方法和take方法为添加和删除的阻塞方法。
ArrayBlockingQueue原理概要
ArrayBlockingQueue的内部是通过一个可重入锁ReentrantLock和两个Condition条件对象来实现阻塞
LinkedBlockingQueue的基本概要
LinkedBlockingQueue是一个由链表实现的有界队列阻塞队列,但大小默认值为Integer.MAX_VALUE,所以我们在使用LinkedBlockingQueue时建议手动传值,为其提供我们所需的大小,避免队列过大造成机器负载或者内存爆满等情况。
LinkedBlockingQueue的实现原理剖析原理概论
LinkedBlockingQueue是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,实际上我们对LinkedBlockingQueue的API操作都是间接操作该数据队列。
LinkedBlockingQueue代码讲解如下:
它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
(1)定义一个Apple类
package com.ProducerConsumer;
public class Apple {
int id;
public Apple(int id){
this.id = id;
}
@Override
public String toString(){
return "Apple: "+id;
}
}
(2)定义一个生产者Producer
思路就是传入一个LinkedBlockingQueue对象,然后使用put方法,一直添加Apple对象
/*
* producer主要是往阻塞队列里面添加苹果
* */
package com.ProducerConsumer;
import java.util.concurrent.LinkedBlockingQueue;
public class Producer extends Thread{
//1、通过构造函数传入阻塞队列
public static LinkedBlockingQueue<Apple> queue;
private static int MAX_NUM=0;
public Producer(LinkedBlockingQueue<Apple> queue,int MAX_NUM){
this.queue = queue;
this.MAX_NUM = MAX_NUM;
}
public void run(){
//只要阻塞队列没有满,就可以一直往里面放苹果
while(true){
try {
queue.put(new Apple(queue.size()));
System.out.println("Producer queue size "+queue.size());
Thread.sleep(1500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("Apple is enough !! "+queue.size());
e.printStackTrace();
}
}
}
}
(3)定义一个消费者Consumer
思路:传入一个LinkedBlockingQueue对象,然后使用take方法,一直消费Apple对象(就是从队列中取出该对象)
package com.ProducerConsumer;
import java.util.concurrent.LinkedBlockingQueue;
public class Consumer extends Thread{
public static LinkedBlockingQueue<Apple> queue;
private static int MAX_NUM=0;
public Consumer(LinkedBlockingQueue<Apple> queue,int MAX_NUM){
this.queue = queue;
this.MAX_NUM = MAX_NUM;
}
public void run(){
//只要阻塞队列不为空,就一直从里面取苹果
while(true){
try {
queue.take();
System.out.println("Consumer Apple is left : "+queue.size());
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("No Apple "+queue.size());
e.printStackTrace();
}
}
}
}
(4)主类,创建一个BlockingQueue,然后创建一个生产者和一个消费者
package com.ProducerConsumer;
import java.util.concurrent.LinkedBlockingQueue;
public class TestProducerConsumer {
public static void main(String[] args) {
// TODO Auto-generated method stub
//1、创建一个BlockingQueue
int MAX_NUM = 10;
LinkedBlockingQueue<Apple> queue = new LinkedBlockingQueue<>(MAX_NUM);
//2、创建一个生产者,一个消费者
Producer producer = new Producer(queue,MAX_NUM);
Consumer consumer = new Consumer(queue,MAX_NUM);
//3、开启两个线程
producer.start();
consumer.start();
}
}