大数据笔试真题集锦---第三章:KafKa面试题

第三章目录

第三章 KafKa

Kafka是一个分布式、高吞吐的消息发布与订阅系统

3.1 Kafka 原理

3.1.1 数据一致性

3.1.1.1 producer端:ACK机制

0:不等待节点同步成功就发送下一条消息,消息发送失败就会直接丢失

1:等待leader副本同步成功才发送下一条消息,如果leader在备份数据到follow前宕机就会丢失数据

-1 : 等待kafka认为follow同步成功才发送下一条消息,不可能丢失数据。

3.1.1.2 kafka端:LEO-HW机制

  1. 当数据写入leader副本后,会立刻更新分区的最新偏移量LogEndOffset
  2. 直到指定数量的follow副本同步成功后才会更新偏移量HighWatermark(最少同步数量可以在参数中设置)
  3. 消费者只能读取到HW的位置,不能读到LEO的位置
  4. 发生重新选举后,所有broker都会将数据截断到HW的位置,宕机恢复的leader也会截断到HW的位置。

截断机制保证了kafka内部数据的一致性,但这也是ack=1有可能导致数据丢失的原因。

--以上所有偏移量均存储在zk中。

3.1.1.3 consumer端:使用"恰好一次"的一致性策略

  1. direct消费数据并手动维护偏移量的存储
  2. 确保数据消费完成后才更新偏移量(使用事务)

3.1.2 存储机制

3.1.2.1 分区存储

kafka通过topic来分主题存放数据,主题内有分区,分区有多个segment,每个segment存储多条消息,消息id由位置决定,可以直接通过消息id定位到消息存储的位置。

3.1.2.2 顺序写入

生产者发送的消息默认会被均匀分布到多个partition上,broker将消息往对应partition的最后一个segment上添加该消息,定时刷写到磁盘。segment达到阀值时会创建新的segement。

3.1.2.3 可靠性

kafka为每个分区都维护了N个副本repli,确保kafka数据的可靠性,并通过LEO-HW机制确保了数据的一致性。

3.2 其他

3.2.1 kafka扩容

将服务器添加到Kafka集群非常简单,只需为其分配唯一的 broker ID并在您的新服务器上启动Kafka即可。但是,这些新的服务器不会自动分配到任何数据分区,除非将分区移动到这些分区,否则直到创建新 topic 时才会提供服务。所以通常当您将机器添加到群集中时,您会希望将一些现有数据迁移到这些机器上。迁移数据的过程是手动启动的,但是完全自动化。在迁移数据时,Kafka会将新服务器添加为正在迁移的分区的 follower,并允许它完全复制该分区中的现有数据。当新服务器完全复制了此分区的内容并加入了同步副本时,其中一个现有副本将删除其分区的数据。分区重新分配工具可用于跨 broker 移动分区。理想的分区分布将确保所有 broker 的数据负载和分区大小比较均衡。分区重新分配工具不具备自动分析Kafka集群中的数据分布并移动分区以获得均匀负载的功能。因此,管理员必须找出哪些 topic 或分区应该移动。

分区重新分配工具可以以3种互斥方式运行:

--generate: 在此模式下,给定一个 topic 列表和一个 broker 列表,该工具会生成一个候选重新分配,以将指定的 topic 的所有分区移动到新的broker。此选项仅提供了一种便捷的方式,可以根据 tpoc 和目标 broker 列表生成分区重新分配计划。

--execute: 在此模式下,该工具基于用户提供的重新分配计划启动分区重新分配。(使用--reassignment-json-file选项)。这可以是由管理员制作的自定义重新分配计划,也可以是使用--generate选项提供的自定义重新分配计划。

--verify: 在此模式下,该工具将验证最近用 --execute 模式执行间的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行

自动将数据迁移到新机器分区重新分配工具可用于将当前一组 broker 的一些 topic 移至新增的topic。这在扩展现有群集时通常很有用,因为将整个 topic 移动到新 broker 集比移动一个分区更容易。当这样做的时候,用户应该提供需要移动到新的 broker 集合的 topic 列表和新的目标broker列表。该工具然后会均匀分配新 broker 集中 topic 的所有分区。在此过程中,topic 的复制因子保持不变。实际上,所有输入 topic 的所有分区副本都将从旧的broker 组转移到新 broker中。例如,以下示例将把名叫foo1,foo2的 topic 的所有分区移动到新的 broker 集5,6。最后,foo1和foo2的所有分区将只在<5,6> broker 上存在。由于该工具接受由 topic 组成的输入列表作为json文件,因此首先需要确定要移动的 topic 并创建 json 文件,如下所示:

  > cat topics-to-move.json
  {"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
  }
  ​

一旦json文件准备就绪,就可以使用分区重新分配工具来生成候选分配:

  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
  //当前分区副本分配
   
  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                {"topic":"foo1","partition":0,"replicas":[3,4]},
                {"topic":"foo2","partition":2,"replicas":[1,2]},
                {"topic":"foo2","partition":0,"replicas":[3,4]},
                {"topic":"foo1","partition":1,"replicas":[2,3]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }
   
  //建议的分区重新分配配置
   
  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }
  ​

该工具会生成一个候选分配,将所有分区从topic foo1,foo2移动到brokers 5,6。但是,请注意,这个时候,分区操作还没有开始,它只是告诉你当前的任务和建议的新任务。应该保存当前的分配,以防您想要回滚到它。新的任务应该保存在一个json文件(例如expand-cluster-reassignment.json)中,并用--execute选项输入到工具中,如下所示:

  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
  //当前分区副本分配
   
  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                {"topic":"foo1","partition":0,"replicas":[3,4]},
                {"topic":"foo2","partition":2,"replicas":[1,2]},
                {"topic":"foo2","partition":0,"replicas":[3,4]},
                {"topic":"foo1","partition":1,"replicas":[2,3]},
                {"topic":"foo2","partition":1,"replicas":[2,3]}]
  }
   
  //保存这个以在回滚期间用作--reassignment-json-file选项
  //成功开始重新分配分区
  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }
  ​

最后,可以使用--verify选项来检查分区重新分配的状态。请注意,相同的expand-cluster-reassignment.json(与--execute选项一起使用)应与--verify选项一起使用:

  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo1,1] is in progress
  Reassignment of partition [foo1,2] is in progress
  Reassignment of partition [foo2,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully
  Reassignment of partition [foo2,2] completed successfully
  ​

自定义分区分配和迁移分区重新分配工具也可用于选择性地将分区的副本移动到特定的一组 broker。当以这种方式使用时,假定用户知道重新分配计划并且不需要该工具产生候选的重新分配,有效地跳过 --generate 步骤并直接到 --execute步骤例如,以下示例将 topic foo1的分区0 移到 broker 5,6中和将 topic foo2的分区1移到 broker 2,3中:第一步是在json文件中定义重新分配计划:

  > cat custom-reassignment.json
  {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
  ​

然后,使用带有 --execute 选项的 json 文件来启动重新分配过程:

   > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
  当前分区副本分配情况
   
   {"version":1,
   "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
                 {"topic":"foo2","partition":1,"replicas":[3,4]}]
   }
   
   保存这个以在回滚期间用作 --reassignment-json-file 选项
   成功开始重新分配分区
   {"version":1,
   "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
                 {"topic":"foo2","partition":1,"replicas":[2,3]}]
   }
  ​

可以使用--verify选项来检查分区重新分配的状态。 请注意,相同的expand-cluster-reassignment.json(与--execute选项一起使用)应与--verify选项一起使用:

  > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
  Status of partition reassignment:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully
  ​

3.2.2 kafka的数据堆积问题?是如何解决的?

数据堆积是怎么产生的?其一是消费者消费能力不足,这种情况,可以通过增加消费者,提升消费能力;其二是使用Kafka时,消费者每次poll的数据业务处理时间不能超过kafka的max.poll.interval.ms,该参数在kafka0.10.2.1中的默认值是300s,所以要综合业务处理时间和每次poll的数据数量。

3.2.3 kafka的消费者组怎么理解,说了一个情景题:用两个同名的topic,两个消费者组去拉取数据的时候,会发生什么

关于消费者消费数据的一些规则:任何Consumer必须属于一个Consumer Group同一Consumer Group中的多个Consumer实例,不同时消费同一个partition。不同Consumer Group的Consumer实例可以同时消费同一个partition。partition内消息是有序的,Consumer通过pull方式消费消息。Kafka不删除已消费的消息

3.2.4 kafka了解多少,kafka的一致性怎么保证的,kafka消费者如何消费分区,分区分配策略

一致性保证一致性定义:若某条消息对Consumer可见,那么即使Leader宕机了,在新Leader上数据依然可以被读到1.HighWaterMark简称HW: Partition的高水位,取一个partition对应的ISR中最小的LEO作为HW,消费者最多只能消费到HW所在的位置,另外每个replica都有highWatermark,leader和follower各自负责更新自己的highWatermark状态,highWatermark <= leader. LogEndOffset2.对于Leader新写入的msg,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被Consumer消费,即Consumer最多只能消费到HW位置这样就保证了如果Leader Broker失效,该消息仍然可以从新选举的Leader中获取。对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)kafka消费者如何消费分区按照Kafka默认的消费逻辑设定,一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费。如果消费者过多,出现了消费者的数量大于分区的数量的情况,就会有消费者分配不到任何分区。kafka分区分配策略Kafka提供了消费者客户端参数partition.assignment.strategy用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka中还提供了另外两种分配策略: RoundRobinAssignor和StickyAssignor。消费者客户端参数partition.asssignment.strategy可以配置多个分配策略,彼此之间以逗号分隔。RangeAssignor分配策略RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。为了更加通俗的讲解RangeAssignor策略,我们不妨再举一些示例。假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有4个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为: 消费者C0:t0p0、t0p1、t1p0、t1p1 消费者C1:t0p2、t0p3、t1p2、t1p3这样分配的很均匀,那么此种分配策略能够一直保持这种良好的特性呢?我们再来看下另外一种情况。假设上面例子中2个主题都只有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为: 消费者C0:t0p0、t0p1、t1p0、t1p1 消费者C1:t0p2、t1p2可以明显的看到这样的分配并不均匀,如果将类似的情形扩大,有可能会出现部分消费者过载的情况。对此我们再来看下另一种RoundRobinAssignor策略的分配效果如何。RoundRobinAssignor分配策略RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobinAssignor策略对应的partition.assignment.strategy参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor。如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为: 消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。举例,假设消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题:t0、t1、t2,这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为: 消费者C0:t0p0 消费者C1:t1p0 消费者C2:t1p1、t2p0、t2p1、t2p2可以看到RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者C1。StickyAssignor分配策略我们再来看一下StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:分区的分配要尽可能的均匀;分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。我们举例来看一下StickyAssignor策略的实际效果。假设消费组内有3个消费者:C0、C1和C2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下: 消费者C0:t0p0、t1p1、t3p0 消费者C1:t0p1、t2p0、t3p1 消费者C2:t1p0、t2p1这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同,但事实是否真的如此呢?再假设此时消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下: 消费者C0:t0p0、t1p0、t2p0、t3p0 消费者C2:t0p1、t1p1、t2p1、t3p1如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为: 消费者C0:t0p0、t1p1、t3p0、t2p0 消费者C2:t1p0、t2p1、t0p1、t3p1可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。举例,同样消费组内有3个消费者:C0、C1和C2,集群中有3个主题:t0、t1和t2,这3个主题分别有1、2、3个分区,也就是说集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。消费者C0订阅了主题t0,消费者C1订阅了主题t0和t1,消费者C2订阅了主题t0、t1和t2。如果此时采用RoundRobinAssignor策略,那么最终的分配结果如下所示(和讲述RoundRobinAssignor策略时的一样,这样不妨赘述一下):【分配结果集1】消费者C0:t0p0消费者C1:t1p0消费者C2:t1p1、t2p0、t2p1、t2p2 消费者C0:t0p0 消费者C1:t1p0 消费者C2:t1p1、t2p0、t2p1、t2p2如果此时采用的是StickyAssignor策略,那么最终的分配结果为:【分配结果集2】 消费者C0:t0p0 消费者C1:t1p0、t1p1 消费者C2:t2p0、t2p1、t2p2可以看到这是一个最优解(消费者C0没有订阅主题t1和t2,所以不能分配主题t1和t2中的任何分区给它,对于消费者C1也可同理推断)。假如此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为: 消费者C1:t0p0、t1p1 消费者C2:t1p0、t2p0、t2p1、t2p2可以看到RoundRobinAssignor策略保留了消费者C1和C2中原有的3个分区的分配:t2p0、t2p1和t2p2(针对结果集1)。而如果采用的是StickyAssignor策略,那么分配结果为: 消费者C1:t1p0、t1p1、t0p0 消费者C2:t2p0、t2p1、t2p2可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂,如果读者没有接触过这种分配策略,不妨使用一下来尝尝鲜。

3.2.5 Kafka与其他消息队列的不同;

功能消息队列 RocketMQApache RocketMQ (开源)消息队列 KafkaApache Kafka (开源)RabbitMQ (开源)安全防护支持不支持支持不支持支持主子账号支持支持不支持支持不支持不支持可靠性– 同步刷盘 – 同步双写 – 超3份数据副本 – 99.99999999%– 同步刷盘 – 异步刷盘– 同步刷盘 – 同步双写 – 超3份数据副本 – 99.99999999%异步刷盘,丢数据概率高同步刷盘可用性– 非常好,99.95% – Always Writable好– 非常好,99.95% – Always Writable好好横向扩展能力– 支持平滑扩展 – 支持百万级 QPS支持– 支持平滑扩展 – 支持百万级 QPS支持– 集群扩容依赖前端 – LVS 负载均衡调度Low Latency支持不支持支持不支持不支持消费模型Push / PullPush / PullPush / PullPullPush / Pull定时消息支持(可精确到秒级)支持(只支持18个固定 Level)暂不支持不支持支持事务消息支持不支持不支持不支持不支持顺序消息支持支持暂不支持支持不支持全链路消息轨迹支持不支持暂不支持不支持不支持消息堆积能力百亿级别 不影响性能百亿级别 影响性能百亿级别 不影响性能影响性能影响性能消息堆积查询支持支持支持不支持不支持消息回溯支持支持支持不支持不支持消息重试支持支持暂不支持不支持支持死信队列支持支持不支持不支持支持性能(常规)非常好 百万级 QPS非常好 十万级 QPS非常好 百万级 QPS非常好 百万级 QPS一般 万级 QPS性能(万级 Topic 场景)非常好 百万级 QPS非常好 十万级 QPS非常好 百万级 QPS低低性能(海量消息堆积场景)非常好 百万级 QPS非常好 十万级 QPS非常好 百万级 QPS低低

3.2.6 在一次leader failover过程中,kafka是如何协调各个broker的,以及处理partition和replication,这些对producer和consumer有何影响?

  1. Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
  3. 对set_p中的每一个Partition:3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。   3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。   3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
  4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。

3.2.7 kafka的producer、broker和consumer如何保持消息不重复不丢失。

kafka支持3种消息投递语义:

  • At most once——最多一次,消息可能会丢失,但不会重复
  • At least once——最少一次,消息不会丢失,可能会重复
  • Exactly once——只且一次,消息不丢失不重复,只且消费一次。但是整体的消息投递语义需要Producer端和Consumer端两者来保证。Producer 消息生产者端一个场景例子:当producer向broker发送一条消息,这时网络出错了,producer无法得知broker是否接受到了这条消息。网络出错可能是发生在消息传递的过程中,也可能发生在broker已经接受到了消息,并返回ack给producer的过程中。这时,producer只能进行重发,消息可能会重复,但是保证了at least once。0.11.0的版本通过给每个producer一个唯一ID,并且在每条消息中生成一个sequence num,这样就能对消息去重,达到producer端的exactly once。这里还涉及到producer端的acks设置和broker端的副本数量,以及min.insync.replicas的设置。比如producer端的acks设置如下:acks=0 //消息发了就发了,不等任何响应就认为消息发送成功acks=1 //leader分片写消息成功就返回响应给produceracks=all(-1) //当acks=all, min.insync.replicas=2,就要求INSRNC列表中必须要有2个副本都写成功,才返回响应给producer,如果INSRNC中已同步副本数量不足2,就会报异常,如果没有2个副本写成功,也会报异常,消息就会认为没有写成功。Broker 消息接收端上文说过acks=1,表示当leader分片副本写消息成功就返回响应给producer,此时认为消息发送成功。如果leader写成功单马上挂了,还没有将这个写成功的消息同步给其他的分片副本,那么这个分片此时的ISR列表为空,如果unclean.leader.election.enable=true,就会发生log truncation(日志截取),同样会发生消息丢失。如果unclean.leader.election.enable=false,那么这个分片上的服务就不可用了,producer向这个分片发消息就会抛异常。所以我们设置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,这样发送成功的消息就绝不会丢失。Consumer 消息消费者端所有分片的副本都有自己的log文件(保存消息)和相同的offset值。当consumer没挂的时候,offset直接保存在内存中,如果挂了,就会发生负载均衡,需要consumer group中另外的consumer来接管并继续消费。consumer消费消息的方式有以下2种;
  1. consumer读取消息,保存offset,然后处理消息。现在假设一个场景:保存offset成功,但是消息处理失败,consumer又挂了,这时来接管的consumer就只能从上次保存的offset继续消费,这种情况下就有可能丢消息,但是保证了at most once语义。
  2. consumer读取消息,处理消息,处理成功,保存offset。如果消息处理成功,但是在保存offset时,consumer挂了,这时来接管的consumer也只能从上一次保存的offset开始消费,这时消息就会被重复消费,也就是保证了at least once语义。以上这些机制的保证都不是直接一个配置可以解决的,而是你的consumer代码来完成的,只是一个处理顺序先后问题。 第一种对应的代码: List<String> messages = consumer.poll(); consumer.commitOffset(); processMsg(messages);第二种对应的代码: List<String> messages = consumer.poll(); processMsg(messages); consumer.commitOffset();Exactly Once实现原理下面详细说说exactly once的实现原理。Producer端的消息幂等性保证每个Producer在初始化的时候都会被分配一个唯一的PID,Producer向指定的Topic的特定Partition发送的消息都携带一个sequence number(简称seqNum),从零开始的单调递增的。Broker会将Topic-Partition对应的seqNum在内存中维护,每次接受到Producer的消息都会进行校验;只有seqNum比上次提交的seqNum刚好大一,才被认为是合法的。比它大的,说明消息有丢失;比它小的,说明消息重复发送了。以上说的这个只是针对单个Producer在一个session内的情况,假设Producer挂了,又重新启动一个Producer被而且分配了另外一个PID,这样就不能达到防重的目的了,所以kafka又引进了Transactional Guarantees(事务性保证)。Transactional Guarantees 事务性保证kafka的事务性保证说的是:同时向多个TopicPartitions发送消息,要么都成功,要么都失败。Consumer端以上的事务性保证只是针对的producer端,对consumer端无法保证,有以下原因:
  3. 压实类型的topics,有些事务消息可能被新版本的producer重写
  4. 事务可能跨坐2个log segments,这时旧的segments可能被删除,就会丢消息
  5. 消费者可能寻址到事务中任意一点,也会丢失一些初始化的消息
  6. 消费者可能不会同时从所有的参与事务的TopicPartitions分片中消费消息如果是消费kafka中的topic,并且将结果写回到kafka中另外的topic,可以将消息处理后结果的保存和offset的保存绑定为一个事务,这时就能保证消息的处理和offset的提交要么都成功,要么都失败。如果是将处理消息后的结果保存到外部系统,这时就要用到两阶段提交(tow-phase commit),但是这样做很麻烦,较好的方式是offset自己管理,将它和消息的结果保存到同一个地方,整体上进行绑定, 可以参考Kafka Connect中HDFS的例子。

3.2.8 说一说kafka的选举机制

Kafka中的选举大致可以分为三大类:控制器的选举、分区leader的选举以及消费者相关的选举,这里还可以具体细分为7个小类。我们一一来过一下,本文只是简单罗列下大致的内容,至于内部具体的细节逻辑就需要靠读者自己去探索啦。虐人还是被虐就靠你的自驱力了。控制器的选举在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态等工作。比如当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。再比如当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。Kafka Controller的选举是依赖Zookeeper来实现的,在Kafka集群中哪个broker能够成功创建/controller这个临时(EPHEMERAL)节点他就可以成为Kafka Controller。分区leader的选举分区leader副本的选举由Kafka Controller 负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行leader的选举动作。基本思路是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。这个说起来比较抽象,有兴趣的读者可以手动关闭/开启某个集群中的broker来观察一下具体的变化。还有一些情况也会发生分区leader的选举,比如当分区进行重分配(reassign)的时候也需要执行leader的选举动作。这个思路比较简单:从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中。再比如当发生优先副本(preferred replica partition leader election)的选举时,直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本。组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组,那么会重新选举一个新的leader.

3.2.9 kafka的topic,partition,replica,message的理解

Topic:Topic可以理解为一个队列,消息根据Topic进行归类。Topic也可以理解为一个命名的消息流partition:partition:一个topic可以分为多个partition,每个partition是一个有序的队;在磁盘上以文件夹的形式存在;消息最终以文件形式保存在partition文件夹下面,分段存储。replica:replica指的是消息的备份,为了保证kafka的高可用(当leader节点挂了之后,kafka依然能提供服务)kafka提供了备份的功能。这个备份是针对partition的。可以通过 default.replication.factor 对replica的数目进行配置,默认值为1,表示不对topic进行备份。如果配置为2,表示除了leader节点,对于topic里的每一个partition,都会有一个额外的备份。message:实际写入Kafka中并可以被读取的消息记录。每个record包含了key、value和timestamp。

3.2.10 kafka生产者的具体实现,怎么保证数据不丢不重

当producer向broker发送一条消息,这时网络出错了,producer无法得知broker是否接受到了这条消息。网络出错可能是发生在消息传递的过程中,也可能发生在broker已经接受到了消息,并返回ack给producer的过程中。这时,producer只能进行重发,消息可能会重复,但是保证了at least once。0.11.0的版本通过给每个producer一个唯一ID,并且在每条消息中生成一个sequence num,这样就能对消息去重,达到producer端的exactly once。这里还涉及到producer端的acks设置和broker端的副本数量,以及min.insync.replicas的设置。比如producer端的acks设置如下:acks=0 //消息发了就发了,不等任何响应就认为消息发送成功acks=1 //leader分片写消息成功就返回响应给produceracks=all(-1) //当acks=all, min.insync.replicas=2,就要求INSRNC列表中必须要有2个副本都写成功,才返回响应给producer,如果INSRNC中已同步副本数量不足2,就会报异常,如果没有2个副本写成功,也会报异常,消息就会认为没有写成功。

3.2.11 kafka自己写api管理的情况下,ack失败,导致重复消费,你自己怎么设计解决?

幂等或者每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID,例如订单ID和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位点做乐观锁拒绝掉旧位点的数据更新。

原创文章 412 获赞 264 访问量 93万+

猜你喜欢

转载自blog.csdn.net/GUDUzhongliang/article/details/105723854