RabbitMq消费信息,对数据处理和编码设置
将消息放到MQ的队列,必然要对消息进行处理,不然使用MQ就显得没有什么意义了。
普通消费
单纯的消费消息只需写一个实现类实现MessageListener的onMessage方法即可,如
public class RabbitmqService implements MessageListener {
private Logger logger = LoggerFactory.getLogger(RabbitmqService.class);
@Autowired
private UserService userService;
@Autowired
private MsgInfoService msgInfoService;
public void onMessage(Message message) {
try {
//将放入队列的消息已UTF-8的格式转为JSON字符串(前提是放入队列是也是JSON字符串,中文存在乱码)
String msg = new String(message.getBody(),"utf-8");
logger.info("消息:{}",JSON.toJSONString(msg));
//将JSON字符串转换为自己想要处理的对象
User user= JSON.parseObject(msg, User .class);
//处理逻辑
User userDto = userService.save(user);
logger.info("调用结果返回:{}",JSON.toJSONString(userDto ));
} catch (Exception e) {
e.printStackTrace();
}
}
}
手动确认消费
手动确认即在消息消费出现异常是可选择自己的处理方式,入消息重入队列,消息的持久化等。手动确认 消息只需实现ChannelAwareMessageListener的onMessage方法
public class AckConsumer implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(AckConsumer.class);
@Autowired
private UserService userService;
@Autowired
private MsgInfoService msgInfoService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("接受到消息:{}",message);
channel.basicQos(1);//每次处理一条消息
String state = null;
try{
String msg = new String(message.getBody(),"utf-8");
logger.info("处理的数据:{}",JSON.toJSONString(msg));
User user= JSON.parseObject(msg, User.class);
User userDto = userService.save(user);
String returnCode = userDto .getReturnCode();
logger.info("调用结果返回:{}",JSON.toJSONString(userDto ));
if(returnCode.equals("1000")){
state = "10A";
//第一个参数Delivery Tag 用来标识信道中投递的消息,RabbitMQ 推送消息给 Consumer 时,
// 会附带一个 Delivery Tag,以便 Consumer可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了.
//第二个参数 multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;
// 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息已消费
}else{
state = "10I";
//第一个参数,消息的标识
//第二个参数是否批量处理消息,true:将一次性拒绝所有小于deliveryTag的消息。
//第三个参数,消息是否重入队列,false将消息存队列删除。true:会重复消费该消息直到消息被确认消费
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//拒绝消费消息
}
}catch (Exception e){
state = "10I";
e.printStackTrace();
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}finally {
//消息持久化
msgInfoService.saveMsg(message, state);
}
}
}