1 什么是生产者消费者模式
想一个现实生活中的例子,啤酒商---超市---消费者也就是我们,啤酒商生产了啤酒,然后将啤酒销售给了超市,我们消费之又会到超市将啤酒买回来自己喝,那么啤酒商和消费者之间是什么关系呢?啤酒商不会管消费者消费了多少啤酒,只要是超市中没有了啤酒或者啤酒不足,卖给超市,如果超市啤酒满货,那么就不在生产啤酒,等待超市进货,我们消费者也不必关心啤酒商生产了多少啤酒,只要超市里面有,我就去买,但是超市中没有货了,我就不能买了,呵呵,这个例子不那么恰当,但是说明生产者消费者模式,是足够了。
那么上面的例子中,啤酒商就是生产者、超市就是仓库或者一个缓存区、消费者就是消费者。
这种模式,使生产者和消费者之间的联系降低了,就是解耦合,通过一个大容器(超市),消费者就不必过于着急等待生产者生产的“啤酒”,生产者(啤酒商)也不必被“催的心烦”。最大的好处就是:增加了并发量,因为容器大了,哈哈,而且实现了解耦合。
2 两种常见的生产者消费者模型
图 2‑1
图 2‑2
由于其他模型不常见,所以在这里就不在介绍,生产者自然可以是一个或者是多个了,嘿嘿
3 生产者消费者模式主要用到的方法
分别为:线程的相关知识,如果不了解请参见我的其他文章
wait()/nodify()、notifyAll()
synchronized,至于这个关键字的具体用处请参见其他文章
关键点:内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。
4具体的Java实现
生产者是一堆线程,消费者是另一堆线程,内存缓冲区写一个仓库类来实现,传递的数据也就是“啤酒”,写一个实体类来实现,还有一个异常类用来做判断
4.1 数据模型--》“啤酒”
public class Beer { private String name;//名字 private double capabality;//容量 private String shap;//形状 public String getName() { return name; } public void setName(String name) { this.name = name; } public double getCapabality() { return capabality; } public void setCapabality(double capabality) { this.capabality = capabality; } public String getShap() { return shap; } public void setShap(String shap) { this.shap = shap; } @Override public String toString() { return "Beer [name=" + name + ", capabality=" + capabality + ", shap=" + shap + "]"; } } |
4.2 仓库--》“超市”
public class Supermarket { private List<Beer> beers = new ArrayList<Beer>();//存储啤酒 private int length;//存储仓库的大小 public Supermarket(int length) { this.length = length; } //进啤酒 public void add(Beer beer) throws CapacityException{ if(size()>=length){ throw new CapacityException("生产的数量过多,请按照规定生产"); } beers.add(beer); } //售卖啤酒 public Beer rmBeer() throws CapacityException{ if(size()<=0){ throw new CapacityException("存货不足,请稍后再来"); } return beers.remove(0); } //超市当前库存 public int size(){ return beers.size(); } } |
4.3 一个异常类,用来判断不符合的条件
/* * 为仓库准备的异常类,容量大小出现错误会抛出异常 */ public class CapacityException extends Exception {
public CapacityException() { super(); } public CapacityException(String message) { super(message); } } |
4.4 生产者—》“啤酒商”
public class BeerProducter implements Runnable{ //超市 private Supermarket supermarket;
public BeerProducter(Supermarket supermarket) { this.supermarket = supermarket; }
@Override public void run() { while (true) { Random r = new Random();//产生随机数为啤酒标号 long num = r.nextInt(100); Beer beer = new Beer(); beer.setCapabality(100); beer.setShap("椭圆"); beer.setName("青岛啤酒"+num); synchronized (supermarket) { try { supermarket.add(beer);//这里如果报异常,证明超过超市的库存,这个时候执行catch语句中的内容 System.out.println("生产者生产了"+beer.getName()); } catch (CapacityException e) { try { supermarket.notifyAll();//唤醒等待的线程 supermarket.wait();//持有该锁的线程也就是不符合条件的线程进入等待 } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(e.getMessage()); } } } } } |
4.5 消费者—》“我们去买啤酒啦”
public class Consumer implements Runnable { //超市 private Supermarket supermarket;
public Consumer(Supermarket supermarket) { super(); this.supermarket = supermarket; } @Override public void run() { while(true){ Beer beer = null; synchronized (supermarket) { try { beer = supermarket.rmBeer(); System.out.println("消费者消费了"+beer.getName()); } catch (CapacityException e) { try { supermarket.notifyAll(); supermarket.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(e.getMessage()); } } } } } |
4.6 测试类
public class TestPC { public static void main(String[] args) { Supermarket supermarket = new Supermarket(20); BeerProducter p1 = new BeerProducter(supermarket); BeerProducter p2 = new BeerProducter(supermarket); BeerProducter p3 = new BeerProducter(supermarket); Consumer c1 = new Consumer(supermarket); Consumer c2 = new Consumer(supermarket); Consumer c3 = new Consumer(supermarket); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); } } |
4.7 测试结果,这里仅取其中部分
生产者生产了青岛啤酒8 生产者生产了青岛啤酒5 生产者生产了青岛啤酒68 生产者生产了青岛啤酒63 生产的数量过多,请按照规定生产 生产的数量过多,请按照规定生产 存货不足,请稍后再来 消费者消费了青岛啤酒31 消费者消费了青岛啤酒5 消费者消费了青岛啤酒6 |
5 生产者和消费者的用途
² 解耦
因为生产者和消费者都不需要关心对方的具体实现逻辑,只需要遵从双方都需要的“超市”大仓库,就可以了,所以可以很好的进行解耦合。
² 并发
因为有“超市”这个大容器,或者成为缓存区的存在,可以使生产者和消费者的数量增加,而不需要消费者紧逼着生产者,两者数量可多可少,所以并发性较好
² 异步
生产者和消费者之间不必同步,因为生产者只需要将“啤酒”放入“超市”即可,比如将一个耗费时间的流程分成两部分,将耗时的给消费者,这样生产者因为处理时间短,而支持高并发,比如:抢票软件和双十一的购物,首先在将用户的请求交给生产者,这样短时间就可以处理很多用户请求,然后放入“大容器”,等待消费者处理。
² 分布式
生产者和消费者之间是通过一个队列来实现的,那么就可以将List或者其他存储对象,序列化,或者存储到数据库上面,而消费者就可以运用这些进行通信,可以进行集群之间的通信。