前言
Springboot对rabbitmq的支持度很高,所以Springboot继承rabbitmq十分简便。
首先maven引入amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml文件中添加配置
spring:
rabbitmq:
publisher-confirms: true
username: hpsyche
password: a7789858
virtual-host: /vhost_test
port: 5672
工作队列
生产者
package com.hpsyche.rabbitmq_spring.pro;
import com.hpsyche.rabbitmq_spring.constant.RabbitMqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author fuzihao
* @date 2019/8/27 16:26
*/
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendSimpleOrder(String msg){
//第一个参数:工作队列名称
//第二个参数:发送的消息
rabbitTemplate.convertAndSend("Spring_Queue",msg);
}
}
消费者
package com.hpsyche.rabbitmq_spring.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author fuzihao
* @date 2019/8/29 14:45
*/
@Component
public class RabbitConsumer {
@RabbitListener(queues = "Spring_Queue")
public void getMsg(String msg){
System.out.println(msg);
}
}
再写一个controller,调用生产者
package com.hpsyche.rabbitmq_spring.controller;
import com.hpsyche.rabbitmq_spring.pro.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author fuzihao
* @date 2019/8/29 14:49
*/
@RestController
@RequestMapping("/msg")
public class MsgController {
@Autowired
private RabbitProducer rabbitProducer;
@GetMapping("/{msg}")
public String getMsg(@PathVariable String msg){
rabbitProducer.sendSimpleOrder(msg);
return "success";
}
}
此时启动发现报错,原因为工作队列不存在,手动到官方创建名为Spirng_Queue的队列,再次启动,启动成功!
访问http://localhost:8080/msg/hello,控制台输出hello,说明配置成功!
Topic模式
首先需要设置文件,如下
package com.hpsyche.rabbitmq_spring.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
//在消费者中来识别队列
final static String message = "goods.addGoods";
final static String messages = "goods.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("Spring_Exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("goods.addGoods");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("goods.#");
}
}
有生产者1
package com.hpsyche.rabbitmq_spring.pro;
import com.hpsyche.rabbitmq_spring.constant.RabbitMqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author fuzihao
* @date 2019/8/27 16:26
*/
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendOrder(String msg){
rabbitTemplate.convertAndSend("Spring_Exchange","goods.addGoods","add:"+msg);
}
}
有生产者2
package com.hpsyche.rabbitmq_spring.pro;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author fuzihao
* @date 2019/8/27 16:26
*/
@Component
public class RabbitProducerTwo {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendOrder(String msg){
rabbitTemplate.convertAndSend("Spring_Exchange","goods.deleteGoods","delete:"+msg);
}
}
有消费者1
package com.hpsyche.rabbitmq_spring.consumer;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* @author fuzihao
* @date 2019/8/29 14:45
*/
@Component
@RabbitListener(queues="goods.messages")
public class RabbitConsumer {
@RabbitHandler
public void exchangeGetMsg(@Payload String msg, @Headers Map<String,Object> map) throws IOException {
System.out.println(msg);
}
}
消费者2
package com.hpsyche.rabbitmq_spring.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author fuzihao
* @date 2019/8/29 14:45
*/
@Component
@RabbitListener(queues="goods.addGoods")
public class RabbitConsumerTwo {
@RabbitHandler
public void exchangeGetMsg(@Payload String msg, @Headers Map<String,Object> map){
System.out.println(msg);
}
}
同时有如下controller
package com.hpsyche.rabbitmq_spring.controller;
import com.hpsyche.rabbitmq_spring.pro.RabbitProducer;
import com.hpsyche.rabbitmq_spring.pro.RabbitProducerTwo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author fuzihao
* @date 2019/8/29 14:49
*/
@RestController
@RequestMapping("/msg")
public class MsgController {
@Autowired
private RabbitProducer rabbitProducer;
@Autowired
private RabbitProducerTwo rabbitProducerTwo;
@GetMapping("/exchangeAdd/{msg}")
public String getExchangeAddMsg(@PathVariable String msg) throws InterruptedException {
rabbitProducer.sendOrder(msg);
return "success";
}
@GetMapping("/exchangeDelete/{msg}")
public String getExchangeDeleteMsg(@PathVariable String msg) throws InterruptedException {
rabbitProducerTwo.sendOrder(msg);
return "success";
}
}
当浏览器访问http://localhost:8080/msg/exchangeAdd/hello时,控制台输出add:hello两次,说明两个队列good.#和good.addGoods都实现了消费;而当浏览器访问http://localhost:8080/msg/exchangeDelete/hello时,控制台输出delete:hello一次,说明只有good.#消费者消费了此信息,结果是准确的。