版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
1:引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
2:在application.yml配置文件中配置rabbitmq
spring:
rabbitmq:
port: 5672
username: admin
password: admin
host: 127.0.0.1
3:在一个文件中配置队列
package com.cqemme.messageservice.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
//本配置文件中配置了两个队列
@Bean
public Queue queue() {
return new Queue("school-system-email");
}
@Bean
public Queue noteQueue() {
return new Queue("school-system-note");
}
}
4:发送者
package com.cqemme.messageservice.feign;
import com.cqemme.comm.utils.JackSonUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Controller
@RequestMapping("/test")
@CrossOrigin
public class TestController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/test")
public Object test() {
Map map=new HashMap();
map.put("a","ttttttttttt");
// "school-system-note" 是路由key,用来确定发到到"school-system-note"队列
amqpTemplate.convertAndSend("school-system-note",JackSonUtils.beanToString(map));
return "kkkkk";
}
}
5:消费者
package com.cqemme.messageservice.receive;
import com.cqemme.comm.po.EmailModelException;
import com.cqemme.comm.utils.JackSonUtils;
import com.cqemme.messageservice.service.NoteService;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
public class NoteReceiver {
@Autowired
private NoteService noteService;
//声明专门消费 "school-system-note" 队列的消息
@RabbitListener(queues = "school-system-note")
public void Receiver(String content){
log.info("NoteRecieve:{}", content);
Map noteMap=JackSonUtils.strToBean(content,Map.class);
System.out.println(noteMap);
}
}
---------------至此整个流程就打通----------------
额外补充交换器的使用:
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以就简单谈谈 direct、fanout、topic 三种交换器
direct: 是通过消息中的路由键(routing key) 来绑定到队列上,它是完全匹配、单播的模式。写代码流程是:将一个队列绑定到一个direct交换器上,同时指定routing key就行了,前面的就是默认用的这个模式
Fanout: 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
这里的三个队列都绑定到了Fanout类型的交换器上
发送者:(只管指定交换器,不指定路由key,那么所有的队列都会收到消息,然后监听的消费者会进行消费)
topic:是规则匹配,这里我就不说了,自己想玩这个的又去百度把,嘿嘿