延时队列:生产者刚发布一条信息。消费者想过一段时间进行接收处理操作。我这边用的是插件
1、首先安装插件(rabbitmq_delayed_message_exchange-3.8.0.ez)
下载这个插件后放到rabbitmq安装目录plugins下。然后进入到sbin下执行cmd命令
插件下载地址:http://www.rabbitmq.com/community-plugins.html
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后把rabbitmq服务重新启动 。就会看到多启动了一个插件
2、接下来就是在代码里进行配置操作
创建路由:x-delayed-type要加上
@Bean public CustomExchange createExchange(){ Map<String,Object> map = new HashMap<String,Object>(); map.put("x-delayed-type", "direct"); return new CustomExchange( QueueEnum.SIGNLE.getExchangeName(), "x-delayed-message", true, false, map); }
这样还没完。还需要再发送方加上延时时间。下图红色圈中的
public void sendMessage(Long orderId,final long delayTimes){ //给延迟队列发送消息 rabbitTemplate.convertAndSend(RabbitMqConfig.QueueEnum.SIGNLE.getExchangeName(),RabbitMqConfig.QueueEnum.SIGNLE.getBindKeyName(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //给消息设置延迟毫秒值 messageProperties.setExpiration(String.valueOf(delayTimes)); messageProperties.setDelay(9000); return message; } }); LOGGER.info("send orderId:{}",orderId); }
这样就万事大吉了。