一、win7安装 RabbitMQ环境
1、下载安装 erlang
2、下载安装 rabbitMQ
详细教程:https://blog.csdn.net/hzw19920329/article/details/53156015
3、启动rabbitMQ
方式一:
在cmd中定位到rabbitMQ的安装目录上的RabbitMQ Server\rabbitmq_server-3.7.7\sbin中,输入rabbitmq-service start启动
方式二:
在服务上,右键启动
4、访问http://localhost:15672/#/
注意:可以在这个平台上,创建用户,队列,交换器,查看消息队列产生的过程等
扫描二维码关注公众号,回复:
3238150 查看本文章
二、springboot rabbitMQ demo
1、创建springboot项目,注意springboot版本最好在1.5以上
2、添加rabbitMQ依赖
<!-- 高级消息队列amqp,实现方式rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、rabbitMQ服务信息配置,属性文件中
######rabbitMQ
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest1
spring.rabbitmq.password=guest1
spring.rabbitmq.virtual-host=/
# 是否发送方确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
4、rabbitMQ的2中方式
(1)广播订阅模式
创建RabbitConfig
@Configuration
public class RabbitConfig {
@Bean
public Queue AMessage() throws IOException, TimeoutException {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() throws IOException, TimeoutException {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() throws IOException, TimeoutException {
return new Queue("fanout.C");
}
//fanout:广播模式或订阅交换器
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) throws IOException, TimeoutException {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) throws IOException, TimeoutException {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) throws IOException, TimeoutException {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
创建发送方
package com.example.demo.common.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
}
}
创建接收方:
@Component
public class Receiver {
@RabbitListener(queues = "fanout.A")
public void processA(String message) {
System.out.println("fanout Receiver A : " + message);
}
@RabbitListener(queues = "fanout.B")
public void processB(String message) {
System.out.println("fanout Receiver B : " + message);
}
@RabbitListener(queues = "fanout.C")
public void processC(String message) {
System.out.println("fanout Receiver C : " + message);
}
}
效果:
注意:
@RabbitListener(queues = "fanout.B")要写在方法上
(2)、路由模式
创建RabbitConfig
@Configuration
public class RabbitConfig {
@Bean
public Queue queue1() {
return new Queue("hello.queue1", true); // true表示持久化该队列
}
@Bean
public Queue queue2() {
return new Queue("hello.queue2", true);
}
//topic:路由交换器
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//绑定
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
}
}
发送方:
@Component
public class Sender implements ConfirmCallback, ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");
}
//发送消息,不需要实现任何接口,供外部调用。
public void send(String msg){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("开始发送消息 : " + msg.toLowerCase());
String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
System.out.println("结束发送消息 : " + msg.toLowerCase());
System.out.println("消费者响应 : " + response + " 消息处理完成");
}
}
接收方:
@Component
public class Receiver {
@RabbitListener(queues = "hello.queue1")
public String processMessage1(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
return msg.toUpperCase();
}
@RabbitListener(queues = "hello.queue2")
public void processMessage2(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue2队列的消息:" + msg);
}
}
效果:
5、java基础方式创建发送方,接收方
public void sender() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest1");
factory.setPassword("guest1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
// @Test
public void receiver() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest1");
factory.setPassword("guest1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}