目录
1.消息队列
MQ(Message Queue,消息队列)通常用来业务解耦、流量削峰、异步等等,市面上有rabbitmq、kafka、ActiveMQ、RocketMQ等等。接下来介绍消息中间件Kafka。更详细参考
2.Kafka是什么?
Kafka是一个分布式的消息引擎。具有以下特征
-
能够发布和订阅消息流(类似于消息队列)
-
以容错的、持久的方式存储消息流
-
多分区概念,提高了并行能力
2.Kafka架构概念
2.1 集群角色
- Broker: Kafka 集群为了提高可用性,一般有多个服务实例(进程),每一个服务实例都是一个Broker,用于存储消息。
- Producer:向Kafka发送消息,生产者会根据topic分发消息到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。
- Cousumer:负责订阅和消费消息。消费者用consumerGroup来标识自己。
- CousumerGroup:同一个Consumer Group可有多个Consumers,Kafka每个消息可以发给多个组,但同组只有一个Consumer消费消息。
-
zookeeper:集群管理,管理以上三种角色的中间组件,保存kafka系统信息,例如消费进度offset,消费组Group等。
2.2 逻辑-物理数据结构
2.2.1 Topic主题
消息主题,包含多个分区Partition(队列),每一个消息都有它的topic,Kafka通过topic对消息进行归类。
Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,,该dir包含了这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。
2.2.2 Partition分区
每个分区都是一个 顺序的、不可变的消息逻辑队列,并且可以持续的添加。
每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名。
2.2.3 message消息
消息是最小的订阅单元,分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
3.设计原理
3.1 生产者发送消息
producer在发布消息的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区算法把消息存储到对应的分区中TopicPartition tp,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡。
1.序列化消息&&.计算partition
根据key和value的配置(默认String)对消息进行序列化,
然后计算partition:ProducerRecord对象中如果指定了partition,就使用这个partition。
否则根据key和topic的partition数目取余,
如果key也没有的话就随机生成一个counter,使用这个counter来和partition数目取余。这个counter每次使用的时候递增。
2.发送到batch&&唤醒Sender 线程
根据topic-partition获取对应的batchs(Dueue<ProducerBatch>),然后将消息append到batch中;
如果有batch满了则唤醒Sender 线程。队列的操作是加锁执行,所以batch内消息时有序的。后续的Sender操作当前方法异步操作。
3.Sender把消息有序发到 broker(tp replia leader)
3.1 确定tp relica leader 所在的broker
- 每台broker都保存了kafka集群的metadata信息,metadata信息里包括了每个topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客户端从任一broker都可以获取到需要的metadata信息;sender线程通过metadata信息可以知道tp leader的brokerId
- producer也保存了metada信息,同时根据metadata更新策略(定期更新metadata.max.age.ms、失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新
4. Sender处理broker发来的produce response
一旦broker处理完Sender的produce请求,就会发送produce response给Sender,此时producer将执行我们为send()设置的回调函数。至此producer的send执行完毕。
3.1.1 生产者配置
buffer.memory:buffer设置大了有助于提升吞吐性,但是batch太大会增大延迟,可搭配linger_ms参数使用
linger_ms:如果batch太大,或者producer qps不高,batch添加的会很慢,我们可以强制在linger_ms时间后发送batch数据
ack:producer收到多少broker的答复才算真的发送成功
0表示producer无需等待leader的确认(吞吐最高、数据可靠性最差)
1代表需要leader确认写入它的本地log并立即确认
-1/all 代表所有的ISR都完成后确认(吞吐最低、数据可靠性最高)
3.2 Consumer消费流程
如下图所示,消费者以消费者组为单位 订阅主题,一个主题的每个消息可以发给多个消费者组,一个消费者组内仅有一个消费者能够消费消息。就像团体奖项,每个组都要选个组长去领奖一样,消费者组 组员之间需要用协调器进行协调选择一个成员去拉取消息。
3.2.1 poll方法
消费者先确保协调器已经分配好分区让当前消费者拉取,然后通过fetcher拉消息(单线程)
3.2.2 coordnator协调器
每个consumer group在broker上都有一个coordnator来管理,消费者加入和退出,消息处理的负载均衡,以及消费消息的位移都由coordnator处理。
消费者通过AbstractCoordinator$HeartbeatTask心跳线程来与broker保持心跳,超时会认为挂掉。
负载均衡reblance
一个消费者组包含多个消费者,为了保证消费者的负载比较均衡,使用协调器进行均衡。协调器会负责协调组中的leader,分配哪些成员接收哪个分区的消息。
当一些原因导致consumer对partition消费不再均匀时,kafka会自动执行reblance,使得consumer对partition的消费再次平衡。
rebalance的时机
-
组订阅topic数变更
-
topic partition数变更
-
consumer成员变更: consumer 加入群组或者离开群组(consumer被检测为崩溃)
举例 consumer加入引起的reblance
-
使用join协议,表示有consumer 要加入到group中
-
使用sync 协议,根据分配规则进行分配
(上图图片摘自网络)
位移管理
consumer的消息位移代表了当前group对topic-partition的消费进度,consumer宕机重启后可以继续从该offset开始消费。
在kafka0.8之前,位移信息存放在zookeeper上,由于zookeeper不适合高并发的读写;
新版本Kafka把位移信息当成消息,主题是__consumers_offsets ,默认有50个分区。
消息的key =groupId+topic_partition,value =offset.
3.3 Broker 设计原理
Broker 是Kafka 集群中的服务实例。
主要功能:
- 处理生产者发送过来的消息,
- 消费者消费的拉取请求。
- 集群节点的管理等
(1)broker消息存储
-
Kafka的消息以二进制的方式紧凑地存储,节省空间
-
此外消息存在ByteBuffer而不是堆内存,这样broker进程挂掉时,数据不会丢失,同时避免了gc问题
-
通过零拷贝和顺序寻址,让消息存储和读取速度都非常快
-
处理fetch请求的时候通过zero-copy 加快速度
(2)broker状态数据
broker设计中,每台机器都保存了相同的状态数据。主要包括以下:
-
所有broker的信息:比如每台broker的ID、机架信息以及配置的若干组连接信息
-
所有分区的信息:所谓分区信息指的是分区的leader、ISR和AR信息以及当前处于offline状态的副本集合。这部分数据按照topic-partitionID进行分组,可以快速地查找到每个分区的当前状态。(注:AR表示assigned replicas,即创建topic时为该分区分配的副本集合)
broker负载均衡
分区数量负载:各台broker的partition数量应该均匀
partition Replica分配算法如下:
-
将所有Broker(假设共n个Broker)和待分配的Partition排序
-
将第i个Partition分配到第(i mod n)个Broker上
-
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
4.Kafka特性
4.1 Kafka如何保证幂等性
不丢消息
-
首先kafka保证了对已提交消息的at least保证
-
Sender有重试机制
-
producer业务方在使用producer发送消息时,注册回调函数。在onError方法中重发消息
-
consumer 拉取到消息后,处理完毕再commit,保证commit的消息一定被处理完毕
不重复
-
consumer拉取到消息先保存,commit成功后删除缓存数据
4.2 Kafka高性能特性
-
partition提升了并发
-
zero-copy
-
顺序写入
-
消息聚集batch
-
页缓存
4.3 生产者优化
-
增大producer数量
-
ack配置
-
batch