本文将详细介绍下flink中的指标核心模块,了解flink中定义的Metric类型,以及是如何分组组织Metric的。主要围绕flink源码中的flink-metrics-core
模块展开。后面介绍下flink中的常见指标项。
1. flink-metrics-core
- Metric
常见的指标类型有Gauge、Count、Meter、Histogram,flink 自定义了 Metric 类。
图1:Metric类图
- MetricConfig
存取flink指标相关配置的工具类,继承Properties,添加了直接读取string、int、long、float、double、boolean配置值的方法。
图2:MetricConfig类图
- MetricGroup
Metric 在 flink 内部以 Group 的方式组织,有多层结构,Metric Group + Metric Name 是 Metric 的唯一标识。
TaskManagerMetricGroup
•TaskManagerJobMetricGroup
•TaskMetricGroup
•TaskIOMetricGroup
•OperatorMetricGroup
•${User-defined Group} / ${User-defined Metrics}
•OperatorIOMetricGroup
•JobManagerMetricGroup
•JobManagerJobMetricGroup
可以根据需要埋点自定义指标:
- 添加一个统计脏数据的指标,指标名为flink_taskmanager_job_task_operator_dtDirtyData :
dirtyDataCounter = runtimeContext.getMetricGroup().counter(MetricConstant.DT_DIRTY_DATA_COUNTER);
- 添加一个消费延迟指标,自定了两层Group,分别是topic、partition,指标名为flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag :
for(TopicPartition topicPartition : assignedPartitions){
MetricGroup metricGroup = getRuntimeContext().getMetricGroup().addGroup(DT_TOPIC_GROUP, topicPartition.topic())
.addGroup(DT_PARTITION_GROUP, String.valueOf(topicPartition.partition()));
metricGroup.gauge(DT_TOPIC_PARTITION_LAG_GAUGE, new KafkaTopicPartitionLagMetric(subscriptionState, topicPartition));
}
MetricGroup通过继承实现多层结构:
图3:MetricGroup类图
2. flink中的指标项
flink 中的指标项,可以大致分Overview、Checkpoint、Watermark、BackPressure、Kafka Connector、JVM等几块,集成到产品中可以是一页指标大盘。
2.1 Overview
其中,dt开头表示自定义指标。
指标名 |
flink_taskmanager_job_task_operator_dtNumBytesIn |
flink_taskmanager_job_task_operator_dtNumBytesInRate |
flink_taskmanager_job_task_operator_dtNumRecordsIn |
flink_taskmanager_job_task_operator_dtNumRecordsInRate |
flink_taskmanager_job_task_operator_dtNumRecordsInResolve |
flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate |
flink_taskmanager_job_task_operator_dtNumRecordsOut |
flink_taskmanager_job_task_operator_dtNumRecordsOutRate |
flink_taskmanager_job_task_operator_dtDirtyData |
flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag |
flink_taskmanager_job_task_operator_dtEventDelay |
2.2 Checkpoint
指标名 |
flink_jobmanager_job_lastCheckpointDuration |
flink_jobmanager_job_lastCheckpointSize |
flink_jobmanager_job_numberOfFailedCheckpoints |
2.3 Watermark
指标名 |
flink_taskmanager_job_task_operator_currentInputWatermark |
flink_taskmanager_job_task_operator_currentOutputWatermark |
flink_taskmanager_job_task_operator_numLateRecordsDropped |
2.4 BackPressure
指标名 |
flink_taskmanager_job_task_buffers_inPoolUsage |
flink_taskmanager_job_task_buffers_outPoolUsage |
flink_taskmanager_job_task_buffers_inputQueueLength |
flink_taskmanager_job_task_buffers_outputQueueLength |
2.5 Kafka Connector
指标名 |
flink_taskmanager_job_task_operator_commitsFailed |
flink_taskmanager_job_task_operator_KafkaConsumer_topic_partition_currentOffsets |
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max |
2.6 JVM
指标名 |
flink_jobmanager_Status_JVM_CPU_Load |
flink_jobmanager_Status_JVM_CPU_Time |
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Count |
flink_jobmanager_Status_JVM_GarbageCollector_PS_MarkSweep_Time |
flink_jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Count |
flink_jobmanager_Status_JVM_GarbageCollector_PS_Scavenge_Time |
flink_jobmanager_Status_JVM_Memory_Heap_Max |
flink_jobmanager_Status_JVM_Memory_Heap_Used |
flink_jobmanager_Status_JVM_Memory_NonHeap_Max |
flink_jobmanager_Status_JVM_Memory_NonHeap_Used |
flink_jobmanager_Status_JVM_Threads_Count |
flink_taskmanager_Status_JVM_CPU_Load |
flink_taskmanager_Status_JVM_CPU_Time |
flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Count |
flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time |
flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Count |
flink_taskmanager_Status_JVM_GarbageCollector_G1_Young_Generation_Time |
flink_taskmanager_Status_JVM_Memory_Heap_Max |
flink_taskmanager_Status_JVM_Memory_Heap_Used |
flink_taskmanager_Status_JVM_Memory_NonHeap_Max |
flink_taskmanager_Status_JVM_Memory_NonHeap_Used |
flink_taskmanager_Status_JVM_Threads_Count |