对于 Kafka 消费者来说,监控它们的消费进度非常的重要,或者说是监控它们消费的滞后程度。这个滞后程度有个专门的名称:消费者 Lag 或 Consumer Lag。所谓滞后程度,就是指消费者当前落后于生产者的程度。比方说,Kafka 生产者向某主题成
功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。通常来说,Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。
我们刚刚提到过,对消费者而言,Lag 应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。更可怕的是,由于消费者的速度无法匹及生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中了,那么这些数据就会失去享有 Zero Copy 技术的资格。这样的话,消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距,进而出现马太效应,即那些 Lag 原本就很大的消费者会越来越慢,Lag 也会越来越大。鉴于这些原因,你在实际业务场景中必须时刻关注消费者的消费进度。一旦出现 Lag 逐步增加的趋势,一定要定位问题,及时处理,避免造成业务损失。
下面主要介绍下几种常用的去监控消费者进度的方法
1. UI
可以使用雅虎开源的一个叫kafka-manager 用起来感觉挺好用的可以看官方介绍,感觉比较详细不在多描述了
https://github.com/yahoo/kafka-manager
2. java API
社区提供的 Java Consumer API 分别提供了查询当前分区最新消息位移和消费者组最新消费消息位移两组方法,我们使用它们就能计算出对应的 Lag。下面举个例子。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Collections.emptyMap();
} catch (ExecutionException e) {
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group:" + groupID);
}
}
}
主要在这个地方entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
获取 Lag 值并封装进一个 Map 对象。
3. shell 脚本
kafka 提供了脚本可以实现,但是在生产环境中不推荐,感觉也不怎么方便这里简单介绍一下
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >
将消费组和kafka broker 连接信息 传递进去会将这个消费组下的所有的消费者的消费信息返回给你。
4. 小结
介绍了三种监控kafka 消费组消费进度分方式,推荐第一种UI的方式 简单易用功能也丰富