RabbitMQ简介
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言(一种适用于高并发的语言)开发,基于AMQP(Advanced MessageQueue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。
使用场景
- 任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。 - 应用程序解耦合
MQ在整个框架中相当于c2风格的中间件,可以达到解耦的作用,MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
使用优势
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
名词解释
AMQP
AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式,为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
JMS
JMS是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。
RabbitMQ工作原理
由图可以知道这种模式是生产者对应多个消费者的模式:
组成部分说明如下:
Broker :消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程:
-
发送消息
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列) -
接收消息
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
测试
先用rabbitMQ官方提供的java client测试。
创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程
在pom.xml中添加使用的相关依赖:
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-logging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
</dependencies>
生产者:
使用代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//使用默认的虚拟机,虚拟机相当于一个独立的mq服务器
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
/**
* 声明队列,如果Rabbit中没有此队列将自动创建
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
*/
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "helloworld小明"+System.currentTimeMillis();
/**
* 消息发布方法
* param1:Exchange的名称,如果没有指定,则使用Default Exchange
* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
* param3:消息包含的属性
* param4:消息体
*/
channel.basicPublish("",QUEUE,null,message.getBytes());
System.out.println("Producer发布了消息"+message);
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭资源
if(channel != null)
{
channel.close();
}
if(connection != null)
{
connection.close();
}
}
}
}
操作流程:
-
使用Connection创建与RabbitMQ服务的TCP连接
-
而获取Connection对象则需要使用Connection工厂ConnectionFactory来获取Connection。
-
ConnectionFactory在使用的时候需要设置相关的参数:
- factory.setHost(“localhost”); //mq服务器的IP地址
- factory.setPort(5672); //端口号(默认)
- factory.setUsername(“guest”); //用户名(默认)
- factory.setPassword(“guest”);//密码(默认)
-
使用工厂创建连接connection
-
使用connection创建channel
-
channel声明队列,相关参数声明队列,如果Rabbit中没有此队列将自动创建
* param1:队列名称 * param2:是否持久化 * param3:队列是否独占此连接 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数如下:channel.queueDeclare(QUEUE, true, false, false, null);
-
创建一个消息体:
-
使用channel的basicPublish()发送消息,相关参数如下:
* 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体
-
关闭相关的资源:channel和connection。
最终得到的数据在管理页面中:
消费者
使用代码:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
//如果确定消息队列中有就可以不用声明,声明是为了防止报错
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties
* @param body 消息体
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
String msg = new String(body,"utf-8");
System.out.println("consumer recieve msg:"+msg);
}
};
channel.queueDeclare(QUEUE, true, false, false, null);
/**
* 监听队列String queue, boolean autoAck,Consumer callback
* 参数明细
* 1、队列名称
* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
为false则需要手动回复
* 3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
操作流程:
前面的获取connection和channel的流程和生产者的一样;下面说一说消费者的核心代码:
- channel.basicConsume(QUEUE, true, consumer);
- 队列名称
- 是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复。
- 消费消息的方法,消费者接收到消息后调用此方法
- 消费者方法:DefaultConsumer consumer = new DefaultConsumer(channel) 重写里面的handleDelivery方法;参数如下:
- 消费者接收消息调用此方法
- @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
- @param properties
- @param body 消息体
- 消费者方法:DefaultConsumer consumer = new DefaultConsumer(channel) 重写里面的handleDelivery方法;参数如下:
结果如下: