安装RabbitMQ
在Docker中直接使用pull
命令拉取RabbitMQ镜像:
docker pull rabbitmq:3-management
然后运行镜像:
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq01 image_id
测试:在浏览器访问,初始化账号密码都是guest
SpringBoot中的RabbitMQ示例
使用IDEA的Spring Initializr创建项目,选择RabbitMQ模块,以及Web模块,方便测试
我们可以查看RabbitAutoConfiguration
的源码:
@Configuration
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class}) // RabbitProperties配置类,有哪些可配置项就查看该类即可
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {
public RabbitAutoConfiguration() {
}
@Bean // 加入RabbitTemplate组件
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
......
}
......
}
配置:
spring:
rabbitmq:
host: 192.168.3.18
username: guest
password: guest
port: 5672
其实这里除了host,其他都是自动配置类的默认配置值
测试:
消息的发布:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootAmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void pubish() {
// Message,需要自己构造,自定义消息头和消息体
// rabbitTemplate.send(exchange, routingKey, message);
// 使用MessageConverter将对象序列化,默认使用SimpleMessageConverter
// 我这里的设置使用direct类型的exchange(amq.direct)绑定一个queue(yujiago.news)
String exchange = "amq.direct";
String routingKey = "yujiago.news";
Map<String, Object> map = new HashMap<>();
map.put("msg", "这是一条消息!");
map.put("data", Arrays.asList("数据", 123, true));
rabbitTemplate.convertAndSend(exchange, routingKey, map);
}
}
查看:
我们可以配置自己的MessageConverter,查看MessageConverter的继承树:
编写配置类:
@Configuration
public class AMQPConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
结果:
消息的接收:
@Test
public void consume() {
String queueName = "yujiago.news";
// Message receive = rabbitTemplate.receive(queueName);
Object o = rabbitTemplate.receiveAndConvert(queueName);
System.out.println(o.getClass());
System.out.println(o);
}
结果:
消息的监听:
消息的监听需要使用两个注解:
扫描二维码关注公众号,回复:
2849647 查看本文章
@EnableRabbit
:开启Rabbit注解,标注在启动类上@RabbitListener
:监听消息队列,标注在接收方法上
@Service
public class UserService {
@RabbitListener(queues = "yujiago.message")
public void receiveMessage(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
@RabbitListener(queues = "yujiago.user")
public void receiveUser(User user){
System.out.println(user);
}
@RabbitListener(queues = "yujiago.news")
public void receiveMap(Map map){
System.out.println(map);
}
}
AmqpAdmin的使用
AmqpAdmin类:spring-amqp的core中,主要用作管理queue、exchange以及binding关系,查看类中声明:为AMQP指定一组基本的AMQP管理操作,其接口定义如下:
public interface AmqpAdmin {
void declareExchange(Exchange var1);
boolean deleteExchange(String var1);
Queue declareQueue();
String declareQueue(Queue var1);
boolean deleteQueue(String var1);
void deleteQueue(String var1, boolean var2, boolean var3);
void purgeQueue(String var1, boolean var2);
void declareBinding(Binding var1);
void removeBinding(Binding var1);
Properties getQueueProperties(String var1);
}