MapReduce计数器是什么?
计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。
MapReduce计数器能做什么?
MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。
MapReduce 都有哪些内置计数器?
MapReduce 自带了许多默认Counter,现在我们来分析这些Counter 的含义,方便大家观察 Job 结果,如输入的字节数、输出的字节数、Map端输入/输出的字节数和条数、Reduce端的输入/输出的字节数和条数等。下面我们只需了解这些内置计数器,知道计数器组名称(groupName)和计数器名称(counterName),以后使用计数器会查找groupName和counterName即可。
任务计数器
在任务执行过程中,任务计数器采集任务的相关信息,每个作业的所有任务的结果会被聚集起来。例如,MAP_INPUT_RECORDS 计数器统计每个map任务输入记录的总数,并在一个作业的所有map任务上进行聚集,使得最终数字是整个作业的所有输入记录的总数。任务计数器由其关联任务维护,并定期发送给TaskTracker,再由TaskTracker发送给 JobTracker。因此,计数器能够被全局地聚集。下面我们分别了解各种任务计数器。
- MapReduce 任务计数器
- MapReduce 任务计数器的groupName为org.apache.hadoop.mapreduce.TaskCounter,它包含的计数器如下表所示:
- MapReduce 任务计数器的groupName为org.apache.hadoop.mapreduce.TaskCounter,它包含的计数器如下表所示:
- 文件系统计数器**
- 文件系统计数器的 groupName为org.apache.hadoop.mapreduce.FileSystemCounter,它包含的计数器如下表所示:
- FileInputFormat (输入文件任务)计数器
- FileInputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter,它包含的计数器如下表所示,计数器名称列的括号()内容即为counterName:
- FileInputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter,它包含的计数器如下表所示,计数器名称列的括号()内容即为counterName:
- FileOutputFormat(输出文件任务) 计数器
- FileOutputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileOutputFormatCounter,它包含的计数器如下表所示:
作业计数器
- FileOutputFormat 计数器的 groupName为org.apache.hadoop.mapreduce.lib.input.FileOutputFormatCounter,它包含的计数器如下表所示:
- 作业计数器由 JobTracker(或者 YARN)维护,因此无需在网络间传输数据,这一点与包括 “用户定义的计数器” 在内的其它计数器不同。这些计数器都是作业级别的统计量,其值不会随着任务运行而改变。
- 作业计数器计数器的 groupName为org.apache.hadoop.mapreduce.JobCounter,它包含的计数器如下表所示:
- 作业计数器计数器的 groupName为org.apache.hadoop.mapreduce.JobCounter,它包含的计数器如下表所示:
计数器的该如何使用? 下面我们来介绍如何使用计数器。
- 定义计数器
- 枚举声明计数器
// 自定义枚举变量Enum
Counter counter = context.getCounter(Enum enum)
- 自定义计数器
// 自己命名groupName和counterName
Counter counter = context.getCounter(String groupName,String counterName)
- 为计数器赋值
- 初始化计数器
counter.setValue(long value);// 设置初始值
- 计数器自增
counter.increment(long incr);// 增加计数
- 获取计数器的值
- 获取枚举计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters=job.getCounters();
Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);// 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG long value=counter.getValue();//获取计数值
- 获取自定义计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters = job.getCounters();
Counter counter=counters.findCounter("ErrorCounter","toolong");// 假如groupName为ErrorCounter,counterName为toolong long value = counter.getValue();// 获取计数值
- 获取内置计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters=job.getCounters(); // 查找作业运行启动的reduce个数的计数器,groupName和counterName可以从内置计数器表格查询(前面已经列举有)
Counter counter=counters.findCounter("org.apache.hadoop.mapreduce.JobCounter","TOTAL_LAUNCHED_REDUCES");// 假如groupName为org.apache.hadoop.mapreduce.JobCounter,counterName为TOTAL_LAUNCHED_REDUCES long value=counter.getValue();// 获取计数值
- 获取所有计数器的值
Configuration conf = new Configuration();
Job job = new Job(conf, "MyCounter");
Counters counters = job.getCounters();
for (CounterGroup group : counters) {
for (Counter counter : group) {
System.out.println(counter.getDisplayName() + ": " + counter.getName() + ": "+ counter.getValue());
}
}
自定义计数器
自定义计数器用的比较广泛,特别是统计无效数据条数的时候,我们就会用到计数器来记录错误日志的条数。下面我们自定义计数器,统计输入的无效数据。
数据集
假如一个文件,规范的格式是3个字段,“\t”作为分隔符,其中有2条异常数据,一条数据是只有2个字段,一条数据是有4个字段。其内容如下所示:
实现
public class MyCounter {
// \t键
private static String TAB_SEPARATOR = "\t";
public static class MyCounterMap extends Mapper<LongWritable, Text, Text, Text> {
// 定义枚举对象
public static enum LOG_PROCESSOR_COUNTER {
BAD_RECORDS_LONG, BAD_RECORDS_SHORT
};
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String arr_value[] = value.toString().split(TAB_SEPARATOR);
if (arr_value.length > 3) {
/* 自定义计数器 */
context.getCounter("ErrorCounter", "toolong").increment(1);
/* 枚举计数器 */ context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG).increment(1);
} else if (arr_value.length < 3) {
// 自定义计数器
context.getCounter("ErrorCounter", "tooshort").increment(1);
// 枚举计数器 context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_SHORT).increment(1);
}
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String[] args0 = {
"hdfs://hadoop2:9000/buaa/counter/counter.txt",
"hdfs://hadoop2:9000/buaa/counter/out/"
};
// 读取配置文件
Configuration conf = new Configuration();
// 如果输出目录存在,则删除
Path mypath = new Path(args0[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
// 新建一个任务
Job job = new Job(conf, "MyCounter");
// 主类
job.setJarByClass(MyCounter.class);
// Mapper
job.setMapperClass(MyCounterMap.class);
// 输入目录
FileInputFormat.addInputPath(job, new Path(args0[0]));
// 输出目录
FileOutputFormat.setOutputPath(job, new Path(args0[1]));
// 提交任务,并退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行结果,在输出日志中,查看计数器的值
从日志中可以看出,通过枚举声明和自定义计数器两种方式,统计出的不规范数据是一样的。