昨天晚上遗留的两个问题
1.两个消费者消费消息都到100了,但是下图中的日志未打印出来
这个问题看代码
public class ConsumerObjectOne implements Runnable { @Override public void run() { while (true) { if (PudConThread.arrayBlockingQueue.size() > 0) { if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) { System.out.println("消费者1--消费已达上限停止消费"); return; } /** * 获取最新的一条消息消费 */ try { //这个地方是关键 MessageVO messageVO = PudConThread.arrayBlockingQueue.take(); System.out.println("消费者1消费消息" + messageVO.toString()); PudConThread.hasConsumerTotal.getAndAdd(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
ArrayBlockingQueue 里面的take(),当队列里面的长度为空时,会进入await 状态,所以两个消费线程在消费掉最后一条时,队列是空队列,take()阻塞不能进行下次循环,消费结束消息不能打印
将消费线程中的,消费消息代码和判断消费消息数量代码位置对调一下就可以了
public class ConsumerObjectOne implements Runnable { @Override public void run() { while (true) { if (PudConThread.arrayBlockingQueue.size() > 0) { /** * 获取最新的一条消息消费 */ try { MessageVO messageVO = PudConThread.arrayBlockingQueue.take(); System.out.println("消费者1消费消息" + messageVO.toString()); PudConThread.hasConsumerTotal.getAndAdd(1); } catch (InterruptedException e) { e.printStackTrace(); } if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) { System.out.println("消费者1--消费已达上限停止消费"); return; } } } } }
在运行结果如下
不对调位置也可以如下改造,将take()换成poll()
public class ConsumerObjectOne implements Runnable { @Override public void run() { while (true) { if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) { System.out.println("消费者1--消费已达上限停止消费"); return; } if (PudConThread.arrayBlockingQueue.size() > 0) { /** * 获取最新的一条消息消费 */ MessageVO messageVO = PudConThread.arrayBlockingQueue.poll(); if (messageVO != null) { System.out.println("消费者1消费消息" + messageVO.toString()); PudConThread.hasConsumerTotal.getAndAdd(1); } } } } }
第2个问题 序号我们用的是AtomicInteger 但是每次都会出现两个为0的序号
分析:每次出现两个为0的序号是,两个生产者在设置序号的时候用的是 AtomicInteger的 get() 方法这个只是返回当前最新值,所以两个生产者并发去get 获取到了初始值0
代码改造如下
public class ProductObjectOne implements Runnable { @Override public void run() { while (true) { if (PudConThread.hasProductTotal.get() >= PudConThread.total) { System.out.println("产品已达上限,停止生产"); return; } MessageVO messageVO = new MessageVO(PudConThread.hasProductTotal.getAndAdd(1), UUID.randomUUID().toString(), "ProductObjectOne---this is pubsub test"); try { PudConThread.arrayBlockingQueue.put(messageVO); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果如下,未出现重复的序号了
来源:宿迁网站优化