前言
至于rabbitMQ的安装我就不写了,近期由于项目需求,使用RabbitMQ异步处理数据,为了数据安全,加入死信队列概念保证数据的简单安全性。这里的安全主要是,超时,处理异常这些内容,一旦程序拿到队列中的数据,处理过程中出现了异常,非常有可能导致数据的丢失。
配置
队列和交换机之间的关系图简单如下:
1.建立交换机
- 建立生产者交换机
- 建立死信交换机
2.建立队列
- 生产者队列
- 建立死信队列
3.交换机与队列之间绑定
- 在生产者队列中绑定生产者交换机
- 在死信队列中绑定死信交换机
代码实例测试
添加rabbitmq的配置文件
spring配置文件加载rabbitmq的配置文件
<import resource="classpath:spring-rabbitmq.xml"/>测试rabbitmq的生产者代码
@Autowired private RabbitTemplate template;/** * 测试rabbitmq * @throws JsonProcessingException 异常信息 */ @Override public void test() throws JsonProcessingException { ChooseCourseMQModel chooseCourseMQModel = new ChooseCourseMQModel("13060141059", "123", CourseTypeEnum.ADD_CHOOSE_COURSE); Message msg = MessageBuilder .withBody(SerializationUtils.serialize(chooseCourseMQModel)) .build(); template.convertAndSend(msg); }消费者:
public class MqConsumer implements ChannelAwareMessageListener { @Override @Transactional(rollbackFor = Exception.class) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("message:" + message); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("---msg recv:" + message.toString()); try { ChooseCourseMQModel chooseCourseMQModel = (ChooseCourseMQModel) SerializationUtils.deserialize(message.getBody()); //业务代码,处理正确,发送确认信息,basicAck if{ channel.basicAck(deliveryTag, false); } else { //处理失败则发送拒绝信息,放入死信队列 channel.basicReject(deliveryTag, false); return; } } catch (Exception e) { //处理发生异常则放入死信队列,发送basicReject System.out.println("error:" + e); channel.basicReject(deliveryTag, false); } } }简单的应用到此已经结束了,建立之后可以在消费者中的try语句块中写一个异常,然后发现生产者队列中的数据通过死信交换机到达死信队列,然后根据业务场景,可以重试或者写一个定时任务重新处理,保证异步处理数据的正确性和稳定性!