RabbitMQ学习列表:
RabbitMQ实战(一)-消息通信基本概念
RabbitMQ实战(二)-消息持久化策略、事务以及Confirm消息确认方式
·前面二个章节主要讲了一下RabbitMQ的基本概念和消息发送的确认模式,正巧现在项目中有个需求(利用RabbitMQ消峰),其他客户端系统会向我们后台系统推送大量的支付财务消息,我们需要做的就是将这些消息入库,以前采取的方式即为按条进行insert,但是现在我们后台系统处理功能越来越多,也越来越复杂,数据库操作变得日益频繁,同时数据库链接资源也变得十分珍贵了,所以我们决定进行批量插入!
·RabbitMQ中消费消息有两种模式,推模式(Push)和拉模式(Pull),在我们平时的开发过程中采取的模式多为推模式(channel.basicConsume(QUEUE_NAME, false, consumer),因为这种推模式为RabbitMQ主动推给消费者进行消费,从而很好的避免了消息的积压,同时也可以减少消息瞬间增多多内存的消耗。但是当需要对消息进行批量处理时,则拉模式更为方便,(channel.basicGet(QUEUE_NAME, false);)同时,当由于某些限制,消费者在某个条件成立时才能消费消息时也采取这种拉的模式。下面用一个测试代码实现RabbitMQ拉取消息消费。
定义一个消息实体
class MessageEntity{
private String message;
private long tag;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getTag() {
return tag;
}
public void setTag(long tag) {
this.tag = tag;
}
}
定义消费者
/**
* 消费者
* @throws Exception
*/
@Test
public void consumer1() throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建消息信道
Channel channel = connection.createChannel();
//存放消息实体List
List<MessageEntity> list=new ArrayList<>();
long start=System.currentTimeMillis();
MessageEntity entity =new MessageEntity();
while(true){
//拉取消息
GetResponse response = channel.basicGet(QUEUE_NAME, false);
if(response == null){
//间隔时间,如果超过10s还没有消费到新到消息,则将消息入库,保证实效性
long interval=System.currentTimeMillis()-start;
if(CollectionUtils.isNotEmpty(list) && interval>10000){
//批量确认消息
channel.basicAck(entity.getTag(),true);
list.clear();
//模仿业务处理
Thread.sleep(1000);
start=System.currentTimeMillis();
}
continue;
}
String str=new String(response.getBody());
entity.setMessage(str);
entity.setTag(response.getEnvelope().getDeliveryTag());
list.add(entity);
//100条消息批量入库一次
if(list.size()%100==0){
//批量确认消息
channel.basicAck(entity.getTag(),true);
list.clear();
//模仿业务处理
Thread.sleep(1000);
start=System.currentTimeMillis();
}
}
}
通过拉模式即可实现消息的批量处理,同时,为了保证消息处理的实效行,在接收到的消息集合不为空的情况下,10s还没有接收到消息情况下,也自动将消息集合中的消息批量入库。