上篇已经介绍过了使用
(1)wait() / notify()方法
(2)await() / signal()方法
这两种方法实现生产者消费者模式,并给出了通用的生产者消费者代码模板:
* 1.锁{
* 2. while 不满足条件
* 释放锁等待 条件改变
* end while
* 3. 生产或者消费
* 4. 唤醒其他生产者或消费者线程
* 5. 释放锁
* }
本篇我们将使用阻塞队列实现生产者消费者模式。那么为什么阻塞队列能够实现生产者消费者模式?
其实阻塞队列的思想就是生产者消费者模式,并实现了上面给出的生产者消费者代码模板,接下来会通过分析阻塞队列LinkedBlockingQueue的源码来验证。在文章的最后会给出使用LinkedBlockingQueue实现生产者消费者模式的代码。
那么如果让我们实现阻塞队列,那么我们会怎么实现呢?
大体思路是:通过上篇的学习我们已经熟练掌握生产者消费者模式的实现模板,那么我们就可以使用通用模板来实现阻塞队列。
(1)使用同一个仓库,仓库中使用的数据结构是链表。
(2)使用同一把锁,同步生产者和消费者线程。
(3)生产者只在仓库未满时进行生产,仓库满时生产者进程被阻塞。
(4)消费者只在仓库非空时进行消费,仓库为空时消费者进程被阻塞。
那么LinkedBlockingQueue是不是这样实现的呢?下面就对其进行详细的分析!
LinkedBlockingQueue思想
数据结构:
LinkedBlockingQueue顾名思义,可以推断出该类实现了队列,并使用链表作为数据结构。事实上也确实如此。下面是LinkedBlockingQueue继承关系图。
锁:
不同于我们上面的猜想,LinkedBlockingQueue使用了两把锁,分别为生产者锁和消费者锁,即所有的生产者线程使用同一把锁,消费者线程使用另外一把锁。示意图如下:
下面考虑几种情况:
(1)当仓库不为空且队列未满时候
由于同时只有一个线程(通过独占锁putLock实现)入队元素并且是操作last节点(,而同时只有一个出队线程(通过独占锁takeLock实现)操作head节点,所以不存在并发安全问题。
(2)仓库为空
当仓库空了,消费者消费不了了,此时就将消费者线程加入到消费者等待队列中去,直到仓库不为空,才唤醒消费者线程。
(3)仓库满
当仓库满了,生产者就不能生产了,此时就将生产者线程加入到生产者等待队列中去,直到仓库不满,才唤醒生产者线程。
LinkedBlockingQueue源码分析
LinkedBlockingQueue的成员变量:
/** 消费者锁*/
private final ReentrantLock takeLock = new ReentrantLock();
/** 消费者等待队列 */
private final Condition notEmpty = takeLock.newCondition();
/** 生产者锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 生产者等待队列 */
private final Condition notFull = putLock.newCondition();
/** 仓库中资源的数量 */
private final AtomicInteger count = new AtomicInteger(0);
原子变量count用来记录仓库中元素个数,另外里面有两个ReentrantLock的独占锁,其中takeLock用来控制同时只有一个消费者线程可以从仓库获取元素,其他消费者线程必须等待,putLock控制同时只能有一个生产者线程可以获取锁去添加元素,其他生产者线程必须等待。另外notEmpty和notFull用来将生产者线程和消费者线程加入到等待队列。这个就是生产者-消费者模型。
主要方法:
//移除并返回队列头部的元素 如果队列为空,则阻塞
public E take() throws InterruptedException {}
//添加一个元素 如果队列满,则阻塞
public void put(E e) throws InterruptedException{}
//添加一个元素并返回true 如果队列已满,则返回false
public boolean offer(E e){}
//添加一个元素并返回true 如果队列已满,就等待timeout时间,如果这个时间段队列资源数还是满的,就返回false
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {}
//移除并返问队列头部的元素 如果队列为空,则返回null
public E poll() {}
//移除并返问队列头部的元素 如果队列为空,就等待timeout时间,如果这个时间段队列资源数还是空的,就返回false
public E poll(long timeout, TimeUnit unit) throws InterruptedException {}
take方法:
public E take() throws InterruptedException {
E x;
int c = -1;//记录消费前的资源数量
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//1.给消费者线程加锁
takeLock.lockInterruptibly();
try {
//2.判断条件,如果仓库资源数量为0,就释放消费者锁,并将该消费者线程加入消费者等待队列中去。
while (count.get() == 0) {
notEmpty.await();
}
//3.消费,并将仓库资源数量减1
x = dequeue();
c = count.getAndDecrement();
//4.如果消费前的资源数大于一,说明此消费者线程消费1个后还有剩余,就喊另外一个消费者线程来消费。
if (c > 1)
notEmpty.signal();
} finally {
//5.释放消费者锁
takeLock.unlock();
}
//6.如果消费前的资源数等于仓库最大容量,那么此时有的生产者线程已经在生产者等待队列了,此时消费了
//一个,就需要唤醒一个生产者线程继续生产。
if (c == capacity)
signalNotFull();
return x;
}
从上面的代码可以看出,基本和我们提出的生产者消费者模板一样。这里重点说一下第四点和第六点。
第4点:4.如果消费前的资源数大于一,说明此消费者线程消费1个后还有剩余,就喊其他消费者线程来消费。假设现在仓库资源数量为0,这时候有两个消费者线程来消费,消费者1和消费者2,发现数量为0,于是这两个消费者线程逐一增加到消费者等待队列中去,然后两个生产者线程生产了2个资源,生产者1和生产者2,当生产者1生产一个资源后,发现原仓库资源数量为0,于是就唤醒消费者1,而此时生产者2又生产了一个资源,此时仓库资源变为2,消费者1开始消费,发现仓库资源数量为2,就唤醒此时还在等待的消费者2。
第6点:如果消费前的资源数等于仓库最大容量,那么此时有的生产者线程已经在生产者等待队列了,此时消费了一个,就需要唤醒一个生产者线程继续生产。理解了第四点,那么第六点就很好理解了。
put方法:
public E take() throws InterruptedException {
E x;
int c = -1;//记录消费前的资源数量
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//1.给消费者线程加锁
takeLock.lockInterruptibly();
try {
//2.判断条件,如果仓库资源数量为0,就释放消费者锁,并将该消费者线程加入消费者等待队列中去。
while (count.get() == 0) {
notEmpty.await();
}
//3.消费,并将仓库资源数量减1
x = dequeue();
c = count.getAndDecrement();
//4.如果消费前的资源数大于一,说明此消费者线程消费1个后还有剩余,就喊另外一个消费者线程来消费。
if (c > 1)
notEmpty.signal();
} finally {
//5.释放消费者锁
takeLock.unlock();
}
//6.如果消费前的资源数等于仓库最大容量,那么此时有的生产者线程已经在生产者等待队列了,此时消费了
//一个,就需要唤醒一个生产者线程继续生产。
if (c == capacity)
signalNotFull();
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;//记录生产前的资源数量
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//1.给生产者线程加锁
putLock.lockInterruptibly();
try {
//2.判断条件,如果仓库资源数量满了,就释放生产者锁,并将该生产者线程加入生产者等待队列中去。
while (count.get() == capacity) {
notFull.await();
}
//3.生产,并将仓库资源数量加1
enqueue(node);
c = count.getAndIncrement();
//4.如果生产前的资源数加一小于最大容量,说明此生产者线程生产1个后仓库还没满,就喊另外一个生产线程来生产。
if (c + 1 < capacity)
notFull.signal();
} finally {
//5.释放生产者锁
putLock.unlock();
}
//6.如果生产前的资源数等于0,那么此时有的消费者线程已经在消费者等待队列了,此时生产了
//一个,就需要唤醒一个消费者线程继续消费。
if (c == 0)
signalNotEmpty();
}
put方法和get方法是对应的。
到此我们就可以使用get和put方法实现消费者生产者模式了。我们将在本篇的最后给出消费者生产者模式代码,下面继续分析LinkedBlockQueue的其他方法。
poll方法:
public E poll() {
final AtomicInteger count = this.count;
//1.如果仓库中资源数量为0,那么就直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
从上面可以看出poll方法和take方法最大的区别就是,如果仓库资源数量为0,那么poll方法就直接返回null,而take方法是进入等待队列直到资源数大于0。poll方法的脾气还是挺大的,当然还有一个但超时限制的poll方法,该方法的主要思想就是,如果仓库资源数量为0,那么我就等你固定的时间,如果仓库资源数还为0,对不起,走人了。源码如下:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
//关键点:如果仓库资源为0,我就等你nanos时间,如果这个时间内仓库资源还为0,对不起,走人!
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
同样道理,offer方法和put是对应的。
offer方法:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//关键点:如果仓库资源数量为最大容量,我就直接返回false,不等了。
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
当然也有一个带超时的offer方法。该方法的主要思想就是,如果仓库资源数量达到最大容量,那么我就等你固定的时间,如果仓库资源数还为最大容量,对不起,走人了。
到此LinkedBlockingQueue的主要方法都已经介绍完了,下面我们就给出我们这篇文章主要的任务。
使用LinkedBlockingQueue实现生产者消费者模式,源码如下:
public class Test {
public static void main(String[] args) {
int count=0;//仓库中的资源数量
int maxCount=8;//仓库能存储的最大资源数量
BlockingQueue<Person> blockingQueue=new LinkedBlockingQueue<Person>(maxCount);
Thread producer1=new Thread(new Producer(blockingQueue,"生产者1号"));
Thread producer2=new Thread(new Producer(blockingQueue,"生产者2号"));
Thread consumer1=new Thread(new Consumer(blockingQueue,"消费者1号"));
Thread consumer2=new Thread(new Consumer(blockingQueue,"消费者2号"));
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}
class Producer implements Runnable{
BlockingQueue<Person> blockingQueue;
String name="";
public Producer(BlockingQueue<Person> blockingQueue,String name){
this.blockingQueue=blockingQueue;
this.name=name;
}
@Override
public void run() {
while(true){
try {
blockingQueue.put(new Person());
System.out.println(name+"生产了1个资源");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
BlockingQueue<Person> blockingQueue;
String name="";
public Consumer(BlockingQueue<Person> blockingQueue,String name){
this.blockingQueue=blockingQueue;
this.name=name;
}
@Override
public void run() {
while(true){
try {
blockingQueue.take();
System.out.println(name+"消费了1个资源");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class Person{}
好了,到此生产者消费者模式就介绍完毕了!