点赞多大胆,就有多大产!开源促使进步,献给每一位技术使用者和爱好者!
干货满满,摆好姿势,点赞发车
路漫漫其修远兮,吾将上下而求索
说在前边
开始一个新的技术篇章,现在带的这个班级要讲消息队列,结合市面上主流的消息队列,包括之前做大数据时使用过的Kafka而言,在Java领域为保障消息可靠性RabbitMQ是最具优势,这篇文章主要说一下消息队列概念和RabbitMQ的安装,常用指令,Java整合,RabbitMQ中的一些核心概念,如有疑问欢迎大家评论区留言讨论,如有帮助点赞支持一下小添吧
安装包链接
这里使用的rabbitmq是3.7.15,使用源码包编译安装在Centos7上,以下是安装包下载链接
链接:https://pan.baidu.com/s/1ymbIPT_JqUB0gVARilCrQg
提取码:qbun
本章参考
链接:消息队列其实很简单
参考书:《RabbitMQ实战指南》
认识消息队列
介绍
我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ,我们后面会一一对比这些消息队列。
另外,我们知道队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。比如生产者发送消息1,2,3…对于消费者就会按照1,2,3…的顺序来消费。但是偶尔也会出现消息被消费的顺序不对的情况,比如某个消息消费失败又或者一个 queue 多个consumer 也会导致消息被消费的顺序不对,我们一定要保证消息被消费的顺序正确。
除了上面说的消息消费顺序的问题,使用消息队列,我们还要考虑如何保证消息不被重复消费?如何保证消息的可靠性传输(如何处理消息丢失的问题)?…等等问题。所以说使用消息队列也不是十全十美的,使用它也会让系统可用性降低、复杂度提高,另外需要我们保障一致性等问题
为什么使用消息队列
使用消息队列主要有两点好处:
-
通过异步处理提高系统性能(削峰、减少响应所需时间);
-
降低系统耦合性。
使用消息队列带来的一些问题
- 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!
- 系统复杂性提高: 加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
- 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
JMS VS AMQP
JMS
JMS 简介
JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
ActiveMQ 就是基于 JMS 规范实现的。
JMS两种消息模型
①点到点(P2P)模型
点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
② 发布/订阅(Pub/Sub)模型
发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
JMS 五种不同的消息正文格式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage – Java原始值的数据流
- MapMessage–一套名称-值对
- TextMessage–一个字符串对象
- ObjectMessage–一个序列化的 Java对象
- BytesMessage–一个字节的数据流
AMQP
AMQP介绍
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
AMQP模型
AMQP核心概念
- Server:又称Broker,接受客户端连接,实现AMQP实体服务
- Connection:连接,应用程序与Broker的网络连接
- Channel:网络信道,几乎所有操作都在Channel中进行,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务
- Message:消息,服务器与应用之间传递的实体数据,由Properties和Body组成,Properties可以用来修饰消息,比如优先级,延迟等高级特征,Body则是消息体内容
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同名称的Exchange或Queue
- Exchange:交换机,接收消息,根据路由键转发给消息绑定的队列
- Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则,虚拟机可用来确定如何路由一个特定消息
- Queue:也称为Message Queue,消息队列,提供了FIFO的处理机制,保存消息并将它们转发给消费者
RabbitMQ 就是基于 AMQP 协议实现的。
JMS vs AMQP
对比方向 | JMS | AMQP |
---|---|---|
定义 | Java API | 协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
支持消息类型 | 提供两种消息模型:①Peer-2-Peer;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | 支持多种消息类型 ,我们在上面提到过 | byte[](二进制) |
总结:
- AMQP 为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。在 Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
- JMS 支持TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
- 由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
常见的消息队列对比
对比方向 | 概要 |
---|---|
吞吐量 | 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。 |
可用性 | 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
时效性 | RabbitMQ 基于erlang开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他三个都是 ms 级。 |
功能支持 | 除了 Kafka,其他三个功能都较为完备。 Kafka 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
消息丢失 | ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。 |
总结:
- ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
- RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
- RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的
- kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。
RabbitMQ
介绍
RabbitMQ是基于Erlang语言并且基于AMQP协议实现的一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据
特点
- 开源、性能优秀,提供稳定性保障,持久化支持,保证了消息的稳定性;
- 提供可靠性消息投递模式、返回模式
- 基于Erlang编写使得集群部署简单
- 社区活跃
高性能的原因
- Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀
- Erlang有着与原生Socket一样的延迟
RabbitMQ架构图
RabbitMQ流程图
RabbitMQ实战
官网:https://www.rabbitmq.com/
RabbitMQ安装
安装依赖
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel
安装Erlang
由于rabbitmq是基于erlang语言开发,所以必须先安装erlang,一定要注意对应版本
官网:https://www.erlang.org/downloads
# 下载
wget http://erlang.org/download/otp_src_22.0.tar.gz
# 解压
tar -zxvf otp_src_22.0.tar.gz
# 配置安装路径
cd otp_src_22.0
mkdir ../erlang
./configure --prefix=/usr/local/src/erlang
# 安装
make install
# 配置环境变量
略
# 查看是否成功,输入一个erl查看是否进入erlang的控制页面
erl
# 退出erl
halt().
安装RabbitMQ
# 下载
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
# 由于是tar.xz格式的所以需要用到xz,没有的话就先安装
yum install -y xz
# 解压
/bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.7.15.tar
# 重命名
mv rabbitmq_server-3.7.15/ rabbitmq
# 配置环境变量
略
修改配置文件
修改/usr/local/src/rabbitmq/ebin目录下的rabbit.app文件
{loopback_users, [<<"guest">>]},------------>{loopback_users, [guest]},
启动访问
启动
# 启动
rabbitmq-server start &
# 停止
rabbitmqctl stop
# 查看状态
rabbitmqctl status
访问
# 启动管理插件
rabbitmq-plugins enable rabbitmq_management
# 浏览器输入
ip:15672
默认用户名密码为guest
命令行与管控台
强调:任何技术都一定要学好命令再使用管控台,比如SQL,Redis,MQ,Java等等
应用操作
启动应用
rabbitmq-server start
或
rabbitmqctl start_app
关闭应用
rabbitmq-server stop
或
rabbitmqctl stop_app
节点状态
rabbitmqctl status
敲黑板:对于RabbitMQ节点的操作有三个命令rabbitmqctl、rabbitmq-server、rabbitmq-plugins,rabbitmq-server主要针对server本身相关操作,比如启停服务,查看服务状态,rabbitmq-plugins可以添加插件,管理插件,而rabbitmqctl命令功能更多
用户操作
查看所有用户
rabbitmqctl list_users
添加一个用户
rabbitmqctl add_user username password
修改密码
rabbitmqctl change_password username newpassword
配置权限
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
查看用户权限
rabbitmqctl list_user_permissions username
清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
设置tag
rabbitmqctl set_user_tags zhaobl administrator
删除用户
rabbitmqctl delete_user username
虚拟主机操作
创建虚拟主机
rabbitmqctl add_vhost vhostpath
列出所有虚拟主机
rabbitmqctl list_vhosts
列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
删除虚拟主机
rabbitmqctl delete_vhost vhostpath
队列操作
查看所有队列信息
rabbitmqctl list_queues
清除队列消息
rabbitmqctl -p vhostpath purge_queue blue
高级操作
移除所有数据
# 该操作只能在服务停止之后才能使用
rabbitmqctl reset
组成集群命令
# ram:数据存储在内存,disc:数据存储在磁盘
rabbitmqctl join_cluster <clusternode> [--ram]
查看集群状态
rabbitmqctl cluster_status
修改集群节点存储形式
# 忘记制定数据存储模式,可以手动修改
rabbitmqctl change_cluster_node_type disc|ram
忘记节点(摘除节点)
# 如果10个节点组成一个集群,运行时有节点宕机或者启动不起来,通过这个命令,将问题节点忘记掉,启动时输入问题节点名称将他们摘除掉,实现失败转移,
# offline是针对与主节点宕机情况使用
rabbitmqctl forget_cluster_node [--offline]
修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2 newnode2......]
高级操作的特点就是影响范围较大,偏向集群或运维
Java集成RabbitMQ
pom依赖
生产者
package com.stt.rabbitdemo.demo1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
/**
* @author stt
* Description:RabbitMQ生产者
*/
public class ProducerDemo1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、配置
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//3、创建Connection对象,这里有个异常为了方便抛出去
Connection connection = connectionFactory.newConnection();
//4、创建Channel发送消息
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ!";
channel.basicPublish("", "test001", null, msg.getBytes());
//6、关闭连接
channel.close();
connection.close();
}
}
消费者
package com.stt.rabbitdemo.demo1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
/**
* @author stt
* Description:RabbitMQ消费者
*/
public class ConsumerDemo1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、配置
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//3、创建Connection对象,这里有个异常为了方便抛出去
Connection connection = connectionFactory.newConnection();
//4、创建Channel发送消息
Channel channel = connection.createChannel();
//5、声明一个队列
/**
*queue: the name of the queue
* 翻译:队列名字
* durable true if we are declaring a durable queue (the queue will survive a server restart)
* 翻译:设置为true,即使服务重启,这个队列也不会被删除
* exclusive true if we are declaring an exclusive queue (restricted to this connection)
* 翻译:独占,设置为true,这个队列只能有一个连接
* autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* 翻译:设置为true这个队列没有别的引用就会自动删除
* arguments other properties (construction arguments) for the queue
* 翻译:扩展参数
*/
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//6、创建消费者
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println(envelope.toString());
System.out.println(properties.toString());
System.out.println("消息内容:" + new String(body));
}
});
}
}
先启动消费者,因为队列是由Consumer创建,再启动生产者,查看控制台输出和管控台
敲黑板:我们发现上边写的代码消费端指明了queueName,而生产者端没有指定也同样可以将数据发送过去完成正常的消费,我们在生产消息时调用basicPublish方法没有指明exchange会使用AMQP default,会根据routingKey找同名的queue,找到则路由过去
Exchange
交换机架构图
交换机属性
- name:交换机名字
- Type:交换机类型:direct、topic、fanout、headers
- Durability:是否需要持久化,true为持久化
- Auto Delete:当最后一个绑定到该Exchange上的队列删除后,自动删除该Exchange
- Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用
交换机类型
direct
介绍
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue,Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则就丢弃该消息
生产者
package com.stt.rabbitdemo.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*/
public class ProducerDirectDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明属性
String exchangeName = "stt_direct_exchange";
String routingKey = "stt.direct";
String msg = "测试direct类型的Exchange";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
消费者
package com.stt.rabbitdemo.demo2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*/
public class ConsumerDirectDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "stt_direct_exchange";
String queueName = "stt_direct_queue";
String routingKey = "stt.direct";
//声明exchange(交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//声明队列(队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性)
channel.queueDeclare(queueName, true, false, false, null);
//将队列绑定到交换机上
channel.queueBind(queueName, exchangeName, routingKey);
//获取消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息数据==》"+new String(body));
}
});
}
}
topic
介绍
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,队列需要绑定一个Topic,Exchange将RouteKey和某个Topic进行模糊匹配,可以使用通配符进行模糊匹配
- “#”匹配一个或多个词
- “*”不多不少只匹配一个词
- “log.#”匹配到“login.info.aa”
- "log.*"匹配到“login.error”
生产者
package com.stt.rabbitdemo.demo3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*/
public class ProducerTopicDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "stt_topic_exchange";
String routingKey1 = "stt.save";
String routingKey2 = "stt.update";
String routingKey3 = "stt.delete.1";
String msg = "Topic类型Exchange 测试";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者
package com.stt.rabbitdemo.demo3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*/
public class ConsumerTopicDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "stt_topic_exchange";
String queueName = "stt_topic_queue";
String routingKey = "stt.#";
//String routingKey = "stt.*";
//声明exchange(交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//声明队列(队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性)
channel.queueDeclare(queueName, true, false, false, null);
//将队列绑定到交换机上
channel.queueBind(queueName, exchangeName, routingKey);
//获取消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息数据==》"+new String(body));
}
});
}
}
Fanout Exchange
介绍
- 不处理routeKey,只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的,因为不需要做匹配这些操作
生产者
package com.stt.rabbitdemo.demo4;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*/
public class ProducerFanoutDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "stt_fanout_exchange";
String msg = "Fanout 类型 Exchange 测试";
channel.basicPublish(exchangeName, "sss", null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者
package com.stt.rabbitdemo.demo4;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*/
public class ConsumerFanoutDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明
String exchangeName = "stt_fanout_exchange";
String queueName = "stt_fanout_queue";
String routingKey = "";
//声明exchange(交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//声明队列(队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性)
channel.queueDeclare(queueName, true, false, false, null);
//将队列绑定到交换机上
channel.queueBind(queueName, exchangeName, routingKey);
//获取消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息数据==》"+new String(body));
}
});
}
}
Binding-绑定
介绍
- 绑定是Exchange和Exchange,以及Exchange和Queue之间的连接关系
- Binding中可以包含RoutingKey或者参数
Queue-消息队列
- 消息队列,实际存储消息数据
- Durability代表是否持久化,Durable是持久化,Transient是不持久化
- Auto delet:选择yes或者no,选择yes代表当最后一个监听被移除之后,该Queue自动删除
Message-消息
- 服务器和应用程序之间传送的数据
- 本质上是一段数据,有Properties和Payload(body)组成
- 常用属性:delivery mode,header(自定义属性)
- content_type:
- content_encoding:
- priority:
- correlation_id:
- reply_to:
- expiration:
- message_id:
- timestamp:
- type:
- user_id:
- app_id:
- cluster_id:
生产者
package com.stt.rabbitdemo.demo5;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author stt
*
* Description:RabbitMQ生产者
*/
public class ProducerDemo1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、配置
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//3、创建Connection对象,这里有个异常为了方便抛出去
Connection connection = connectionFactory.newConnection();
//4、创建Channel发送消息
Channel channel = connection.createChannel();
// 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
channel.exchangeDeclare("exchangeName", BuiltinExchangeType.DIRECT, true, false, false, null);
Map<String, Object> headers = new HashMap<>();
headers.put("my1", "1111");
headers.put("my2", "2222");
// 设置消息属性 发布消息 (交换机名, Routing key, 可靠消息相关属性 后续会介绍, 消息属性, 消息体);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.deliveryMode(2)//设置是否持久化,2持久化,1不持久化
.contentType("UTF-8")//设置编码格式
.expiration("10000")//设置过期时间
.headers(headers)//其他信息Map集合
.build();
//发送消息
String msg = "带附加信息的Message!";
channel.basicPublish("", "test001", basicProperties, msg.getBytes());
//6、关闭连接
channel.close();
connection.close();
}
}
消费者
package com.stt.rabbitdemo.demo5;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author stt
* Description:RabbitMQ消费者
*/
public class ConsumerDemo1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、配置
connectionFactory.setHost("192.168.109.201");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//3、创建Connection对象,这里有个异常为了方便抛出去
Connection connection = connectionFactory.newConnection();
//4、创建Channel发送消息
Channel channel = connection.createChannel();
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println(envelope.toString());
System.out.println(properties.toString());
System.out.println(properties.getHeaders().get("my1")+"===>"+properties.getHeaders().get("my2"));
System.out.println("消息内容:" + new String(body));
}
});
}
}
虚拟主机
- 虚拟地址,用于进行逻辑隔离,由上层的消息路由
- 一个Virtual Host里面可以由若干个Exchange和Queue
- 同一个Virtual Host里面不能有相同名称的Exchange和Queue
总结
- 目前大多数企业都在使用RabbitMQ来做解耦,削峰等业务
- RabbitMQ发送消息将消息发送到Exchange中,然后Exchange根据routing Key规则将消息路由到指定的Queue中,我们从Queue中获取消息即可
- 一个虚拟主机可以包含若干个Exchange