什么是消息队列
MQ全称为Message Queue,即消息队列。RabbitMQ基于AMQP协议实现消息队列,解决应用间的通信方法。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
消息队列应应用场景:
- 异步处理:
将不需要同步处理并且耗时的操作由消息队列通知接收方进行异步处理,提高应用程序的响应时间。 - 应用解耦:
MQ相当于中介,生产方通过MQ与消费方交互,使应用程序解耦。
APMQ协议:
APMQ是一种消息队列协议,基于此协议实现客户端可向消息中间件传递消息.
JMS消息服务:
JMS是消息服务应用程序接口,用来异构系统 集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活 ,通过生产消费者模式(生产者、服务器、消费者)。
RabbitMQ工作原理:
组成部分说明:
- Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
- Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
生产者发送消息流程:
- 生产者和Broker建立TCP连接。
- 生产者和Broker建立通道。
- 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
- Exchange将消息转发到指定的Queue(队列)
消费者接收消息流程:
- 消费者和Broker建立TCP连接
- 消费者和Broker建立通道
- 消费者监听指定的Queue(队列)
- 当有消息到达Queue时Broker默认将消息推送给消费者。
- 消费者接收到消息。
- ack回复
RabbitMQ安装:
- RabbitMQ下载地址:
Github仓库:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-3.7.7.exe
Bintray仓库:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7.exe
上面展示出的是本文撰写时,RabbitMQ的最新版本3.7.7
注意:前提先安装erlang 并配置环境
命令窗口执行:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.7\sbin>rabbitmq-plugins.bat enable rabbitmq_management (安装管理插件)
浏览器:http://localhost:15672 默认账号密码:guest
案例:
public class Produce01 { //生产者
//队列名
private static final String QUEUE = "SERVER_A";
public static void main(String[] args) {
//生产者通过会话通道将信息发送至MQ服务然后由路由器将消息转发至队列中
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //连接地址
factory.setPort(5672); //端口
factory.setUsername("guest");
factory.setPassword("guest");
//设置虚拟机 每个虚拟机相当于独立的MQ
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//建立连接
connection = factory.newConnection();
//创建会话通道 生产者和MQ服务的通信都在通道中完成
channel = connection.createChannel();
/**
* 声明队列 如队列不存在则创建队列
* 队列参数
* String queue, 队列名
* boolean durable, 持久化
* boolean exclusive, 是否独占队列 队列只允许当前连接访问 连接关闭队列删除 true 代表临时队列
* boolean autoDelete, 是否自动删除 exclusive设置为临时队列(true) 那么连接关闭队列自动删除
* Map<String, Object> arguments 扩展参数 例如 队列存活时间
*/
channel.queueDeclare(QUEUE,true,false,false,null);
/**
* 发送消息
* String exchange, 交换机 如设定交换机则使用mq默认交换机
* String routingKey, 路由key 交换机根据路由key将消息转发到指定的队列中,默认交换机则设置队列的名
* BasicProperties props, 消息属性
* byte[] body 消息体
*/
String message = "Hello RabbitMQ"; //消息内容
channel.basicPublish("",QUEUE,null,message.getBytes());
System.out.println("produce send mq:" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
channel.close(); //关闭会话通道
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close(); //关闭连接
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class Consumer01 { //消费者
private static final String QUEUE = "SERVER_A";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //连接地址
factory.setPort(5672); //端口
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/"); //设置虚拟机
Connection connection = null;
try {
connection = factory.newConnection(); //建立连接
Channel channel = connection.createChannel(); //创建会话通道
/** 监听哪个队列
* 声明队列 如队列不存在则创建队列
* 队列参数
* String queue, 队列名
* boolean durable, 持久化
* boolean exclusive, 是否独占队列 队列只允许当前连接访问 连接关闭队列删除 true 代表临时队列
* boolean autoDelete, 是否自动删除 exclusive设置为临时队列(true) 那么连接关闭队列自动删除
* Map<String, Object> arguments 扩展参数 例如 队列存活时间
*/
channel.queueDeclare(QUEUE,true,false,false,null);
//实现消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 当接受到消息此方法就会执行消费
* @param consumerTag 消费者标签 标识消费者
* @param envelope 信封
* @param properties 消息属性
* @param body 消息体
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange(); //交换机
long deliveryTag = envelope.getDeliveryTag(); //消息ID
String message = new String(body,"UTF-8");
System.out.println("consumer :" + message);
}
};
/**
* 开始监听队列
* String queue, 队列名
* DeliverCallback deliverCallback, true 自动回复MQ消息已接收
* CancelCallback cancelCallback
*/
channel.basicConsume(QUEUE,true,defaultConsumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}