初步了解了RabbitMQ的流程、工作模式后,进行RabbitMQ与SpringBoot整合
一、相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置文件设置RabbitMQ的连接
server:
port: 44000
spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 虚拟机IP
port: 5672 #RabbitMQ服务端端口
username: guest
password: guest
virtualHost: /
三、交换机、队列的声明及绑定
package com.xuecheng.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置
* @author Huang
* @version 1.0
* @date 2020/4/3 13:55
*/
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
System.out.println("成功声明交换机!");
return ExchangeBuilder
.topicExchange(EXCHANGE_TOPICS_INFORM) //交换机类型(通配符),名称
.durable(true) //是否持久化
.build();
}
//声明队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
//声明队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
//绑定队列和交换机
@Bean
public Binding Binding_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, //@Qualifier获取Spring容器中对应名称的组件
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder
.bind(queue) //绑定队列
.to(exchange) //绑定交换机
.with(ROUTINGKEY_EMAIL) //设置带有通配符的路由key
.noargs(); //没有属性参数
}
//绑定队列和交换机
@Bean
public Binding Binding_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder
.bind(queue) //绑定队列
.to(exchange) //绑定交换机
.with(ROUTINGKEY_SMS) //设置带有通配符的路由key
.noargs(); //没有属性参数
}
}
四、发送消息
package com.xuecheng.test.rabbitmq;
import com.xuecheng.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Huang
* @version 1.0
* @date 2020/4/3 14:39
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer06_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
//使用rabbitTemplate发送消息
@Test
public void test01(){
String content = "send to huang";
/**
* 参数明细:
* 1、交换机名称
* 2、路由key
* 3、消息内容
*/
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM, "inform.email", content);
}
}
五、接收消息
package com.xuecheng.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author Huang
* @version 1.0
* @date 2020/4/3 14:52
*/
@Component //将此类扫描Spring容器中
public class RabbitMQReceive {
/**
* 监听指定队列的消息并接收
* @param msg 消息字符串
* @param message 消息体对象,里面包括消息字符串等信息
* @param channel 连接通道(一个Connection有多个连接通道channel)
*/
@RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
public void receive_Email(String msg, Message message, Channel channel){
System.out.println("接收到的 msg:" + msg);
System.out.println("接收到的 Message:" + message);
System.out.println("接收到的 Channel:" + channel);
}
}
关于注解
1、@Qualifier:获取Spring容器中对应名称的组件
2、@Component:将此类扫描Spring容器中
3、@RabbitListener:监听队列并接收消息
总结
使用RabbitMQ的基本流程: