消息发送
- producer 采用 push 模式将消息发到 broker,每条消息都被 append 到 patition 中,顺序写磁盘(保障吞吐率)。
- 日志文件的删除策略:启动后台线程定期扫描log file列表,把保存时间超过阀值的文件删除,为避免删除文件时仍有read操作(consumer消费),采取copy-on-write方式。
- producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。(如一致性哈希)
- producer 向 broker 发消息时,一旦消息被 commit就不会丢。如果 producer 发数据给 broker 后,遇到网络问题造成通信中断,Producer 就无法判断消息是否 commit。producer 可以生成一种类似主键ID,发生故障时幂等的重试,这样能做到 Exactly once,但目前还并未实现。目前默认下条消息从 producer 到 broker 来确保 At least once。
broker保存消息
- 物理上把 topic 分成一或多个 patition,每个 patition 物理上对应一个文件夹(存储该 patition 的所有消息和索引)。无论消息是否消费,kafka 都会保留所有消息。不过可以设置时间和size阈值删除旧数据。
Kafka的高可用
复制
- 一个partition多个副本,只有一个为leader,上下游只与leader通信,其他副本作为follower从leader中复制数据
- 当所有follower都将一条消息保存成功,此消息才被认为是"committed",此时consumer才能消费它。即使只有一个replicas实例存活,仍可保证消息的正常发送和接收,只要zookeeper集群存活即可。(不同于其他分布式存储,比如hbase需要"多数派"存活才行)
leader的故障恢复
- 从 follower 中选举新 leader,新leader 必须拥有旧 leader commit 过的所有消息
- 当所有 replica 都不工作时,选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
- 选择follower时需要兼顾一个问题,就是新leader已经承载的partition leader的个数,如果一个server上有过多partition leader,意味着此server将承受更多IO压力,需要考虑到"负载均衡".
broker故障恢复
- (1)controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
- (2)controller 从 /brokers/ids 节点读取可用broker
- (3)controller决定set_p,该集合包含宕机 broker 上的所有 partition
- (4)对 set_p 中的每一个 partition
(4.1)从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
(4.2) 选举新 leader
(4.3) 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点 - (5) 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
controller故障恢复
- 当 controller 宕机时触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
消费者
PUSH与PULL两种消息接收模式
consumer rebalance
- 当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。
- 每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。
性能优化
持久性
- kafka对日志文件进行append操作,磁盘检索开支较小;为了减少磁盘写入次数,broker会将消息暂时buffer,当消息个数(或尺寸)达到一定阀值再flush磁盘,这减少了磁盘IO调用次数。
- 将Partition分为多个Segment,每个Segment对应一个物理文件;
- 通过删除整个文件的方式去删除Partition内的数据,这种方式清除旧数据的方式,也避免了对文件的随机写操作。
性能优化
- 除磁盘IO外还需要考虑网络IO,这直接关系到kafka吞吐量。
- producer端将消息buffer起来,当条数达到一定阀值时批量发给broker;* consumer端也一样,批量fetch多条消息。
- kafka broker端,有sendfile系统调用可以潜在提升网络IO性能:将文件数据映射到系统内存,socket直接读取相应内存区域无需进程再次copy和交换
- 启用消息压缩机制,压缩消耗少量CPU资源,不过对kafka而言网络IO更应考虑
生产者负载均衡
- producer会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,消息被路由到哪个partition上由producer决定。比如可以采用"random"“key-hash”"轮询"等。
利用Partition实现并行处理
- 由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。(Partition是最小并发粒度)
ISR实现可用性A与数据一致性C的动态平衡
- 只有ISR中的所有Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。
零拷贝
- Linux 2.4+内核通过sendfile系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
consumer group
- Kafka保证一条消息在同一个consumer group中只消费一次。
- 允许不同的consumer group同时消费同一条消息。