使用rabbitmq也有一段时间了,一直想找个时间总结一下使用方法。
由于本项目使用的是springboot,在此先做在springboot中使用的配置方法吧,其他的其实也差不多。
上干货
1、首先引入依赖包
<!--消息队列,rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
rabbit版本的配置在父工程里面了,也不是很重要,就补贴出来了。读者可以自己定义一个有效的版本
2、配置注册消息队列
/**
* 声明队列
*/
@Configuration
public class RabbitMqConfig {
// 声明队列,开发人员短信,邮件发送
@Bean
public Queue developsQueue() {
return new Queue(GlobalValueMq.PRISE_WECHAT_DEVELOPS_NOTIFY_EMAIL, true);
}
// 声明队列,管理员的短信发送
@Bean
public Queue managerQueue() {
return new Queue(GlobalValueMq.PRISE_WECHAT_MANAGER_NOTIFY_EMAIL, true);
}
}
这样配置后,我们启动项目,就能够向消息队列服务端注册指定的消息队列(rabbitMq的安装请参考另外一篇文章)。为保证消息队列的唯一性,我这里定义了消息队列的名称是常量,统一管理,例如GlobalValueMq.PRISE_WECHAT_MANAGER_NOTIFY_EMAIL
。常量类我就补贴出来了,读者可以自行定义一个。后面的 true
,查看源码可以发现,这是消息队列定义为持久。默认是false。
3、消息的生产者
在springboot中,我们集成了AmqpTemplate接口,通过它我们可以生产一个消息对象。
/**
* 邮件发送
*/
@Service
public class SmsServiceImpl implements SmsService {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 邮件发送
*/
@Override
public PaixiResult sendCustom(MessageCenter messageCenter) {
this.rabbitTemplate.convertAndSend(GlobalValueMq.PRISE_WECHAT_MANAGER_NOTIFY_EMAIL, messageCenter);
return PaixiResult.ok(messageCenter.getPhone());
}
}
4、消息的消费者
在springboot中,我们可以通过@RabbitListener
注解监听指定的消息队列
通过@RabbitHandler
指定一个方法入口,从消息队列中取出一个消息,处理一个消息
/**
* @author zhaoxi
* @date 2018/05/26 14:35
* TODO:发送邮件
*/
@RabbitListener(queues = GlobalValueMq.PRISE_WECHAT_MANAGER_NOTIFY_EMAIL)
@Component
public class MqUserSms {
private static final Logger log = LoggerFactory.getLogger(MqUserSms.class);
@RabbitHandler
public void smsProcess(MessageCenter messageCenter) {
log.info("发送邮件 >>> " + messageCenter.getContent());
}
}
上面就是一个消息队列消费者的示例,消费的逻辑就是方法体。可以定义多个消费者,加快消费速度。
5、后话
依照上面的步骤,我们就可以简单的使用消息队列进行正常的服务一般的需求了。还有复杂一些的无非就是对绑定规则做一下调整。比如需要实现消息的订阅,需要使用交换机,把消息队列绑定到交换机;而消费者监听交换机,进行前缀或者后缀的匹配等。
还有,实现消息延迟发送,可以通过死信队列实现。大致是这样,定义一个队列,设置消息的存活时间;在定义一个替补队列(死信队列),一旦上面的队列消息失效了,这些失效的消息就会被丢到替补队列(死信队列)在中。那么,我们只需要配置生产者往正常队列中发消息任务,但是我们不给这个队列配置消费者,而是给那个替补队列(死信队列)配置消费者。最终所有的消息任务都会因为延迟,死亡,被丢到死信队列中,被消费。从而达到延时处理的效果。(废话多了点,哈哈哈)