版权声明:YETA https://blog.csdn.net/qq_28958301/article/details/88231900
出现背景
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,称为消息总线。
在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或其他一些管理操作等。
消息代理
消息代理是一种消息验证、传输、路由的架构模式。
它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。
场景:
- 将消息路由到一个或多个目的地;
- 消息转化为其他的表现方式;
- 执行消息的聚集、消息的分解,并将结果发送给到它们的目的地,然后重新组合响应返回给消息用户;
- 调用Web服务来检索数据;
- 响应事件或错误;
- 使用发布-订阅模式来提供内容给或基于主题的消息路由。
产品:
- ActiveMQ;
- Kafka;
- RabbitMQ;
- RocketMQ。
RabbitMQ基本概念
- Broker:消息队列服务器的实体,负责接收消息生产者的消息,然后将消息发送至消息接收者或其他Broker;
- Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去;
- Queue:消息队列,消息通过发送和路由之后最终到达的地方,到达Queue的消息即进入逻辑上等待消费的状态;
- Binding:绑定,作用是将Exchange和Queue按照路由规则绑定起来;
- Routing Key:路由关键字,Exchange通过这个关键字进行过消息投递;
- Virtual host:虚拟主机,是对Broker的虚拟划分,将消费者、生产者和它们依赖的AMQP相关结构进行隔离;
- Connection:连接,代表生产者、消费者、Broker之间进行通信的物理网络;
- Channel:消息通道,用于连接生产者和消费者的逻辑结构;
- Producer:消息生产者,制造消息并发送消息的程序;
- Consumer:消息消费者,接收消息并处理消息的程序。
消息投递到队列的过程:
- 客户端连接到消息队列服务器,打开一个Channel;
- 客户端声明一个Exchange,并设置相关属性;
- 客户端声明一个Queue,并设置相关属性;
- 客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系;
- 客户端投递消息到Exchange;
- Exchange接收到消息后,该根据消息的Key和已经设置的Binding,进行消息路由,将消息投递到一个个或多个Queue里。
RabbitMQ支持消息的持久化,也就是将数据写在磁盘上,包括3个部分:
- Exchange持久化:在声明时指定durable=>1;
- Queue持久化:在声明时指定durable=>1;
- 消息持久化:在投递时指定delivery_mode=>2(1是非持久化)。
RabbitMQ实现消息总线
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向过消息的中间件。
- Windows安装
- 安装Erlang
-
- 安装RabbitMQ
-
- 开启Web给管理插件
-
- 访问 http://localhost:15672 并以用户名、密码 guest 登陆
- 添加用户
- Ubuntu安装(未测试)
- 安装Erlang
apt-get install erlang
-
- 新增APT仓库到/etc/apt/sources.list.d
echo 'deb http://www.rabbitmq.com/debian/ testing main' |
sudo tee /etc/apt/sources.list.d/rabbitmq.list
-
- 更新API仓库的package list
sudo apt-get update
-
- 安装Rabbit Servver
sudo apt-get install rabbitmq-server
-
- 开启Web管理插件
rabbitmq-plugggins enable rabbitmq_management
- 依赖
- 配置
- 消息生产者
- 消息消费者
- 配置类
- 单元测试
- 测试结果
【备注】在整个生产消费过程中,生产和消费是一个异步操作,这也是在分布式系统中要使用消息代理的重要原因,以此可以使用通信来解耦业务逻辑,系统间调用就没有同步调用那么高的实时性要求,同时也更容易控制处理吞吐量以保证系统的正常运行。
Spring Cloud Config整合Spring Cloud Bus AMQP
- Config Server和Config Client都引入依赖
- 配置
- 启动Config Server和两个Config Client
- 第一次获取Git配置
- 如果Config Server整合了Spring Boot Security,注意开放动态刷新接口,或者请求的时候携带Authorization
- 修改Git配置,在Config Server上动态刷新
- 第二次获取Git配置
- 指定刷新范围
Spring Cloud官方文档对destination的解释:
- 此时架构
服务实例不需要承担触发配置更新的职责,对于Git仓库配置修改只需针对Config Server即可,简化了集群上的一些维护工作。
RabbitMQ配置
Kafka基本概念
- Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker;
- Topic:逻辑上通过RabbitMQ的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic;
- Partition:物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分为一个或多个Partition,每个Partition对英国一个文件夹;
- Producer:消息生产者,负责生产消息并发送到Kafka Broker;
- Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端;
- Consumer Group:每个Consumer属于一个特定的组,组可以用来实现一条消息被组内多个成员消费等功能。
Kafka是一个由LinkedIn开发的分布式消息系统,现在由Apache基金会维护和开发,使用Scala实现。
Kafka是基于消息发布-订阅模式实现的消息系统,设计目标:
- 消息持久化:以时间复杂度O(1)的方式停工消息持久化能力,即使对TB级以上的数据也能保证常数时间复杂度的访问性能;
- 高吞吐:在廉价的商用机器上也能够支持单机每秒10万条以上的吞吐量;
- 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序;
- 跨平台:支持不同技术平台的客户端(如Java、PHP、Python等);
- 实时性:支持事实数据处理和离线数据处理;
- 伸缩性:支持水平扩展。
Kafka实现消息总线
- 环境安装
参考:https://blog.csdn.net/u010283894/article/details/77106159
- 创建Topic:
- 创建消息生产者:
- 创建消息消费者:
Spring Cloud Config整合Spring Cloud Bus Kafka
- 启动zookeeper
C:\Users\Administrator>zkserver
- 启动kafka
F:\kafka_2.12-2.1.1\bin\windows>kafka-server-start.bat f:\kafka_2.12-2.1.1\config\server.properties
- Config Server和Config Client引入依赖
- 启动Config Server,可以看到控制台输出
- 启动Config Client,可以看到控制台输出
- 动态刷新过程如上文