版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wenzhou1219/article/details/88219625
MR常用需求
在进一步讲解MR各个组件前,先详细说下编写常用MR需要考虑的点。之前讲过最简单MR如何编写,而在实际应用中,为了工程考虑和复用性,还需要考虑配置解析、执行环境准备、任务参数设置、目录清理。
- 配置解析:默认hadoop jar执行时使用集群当前配置文件配置。如下图,输入hadoop fs时出现如下通用选项,可以通过命令行-conf和-D输入调整集群配置,可以通过-files、-archives和-libjars分发文件到项目中使用,还可以输入自定义配置。MR提供了常用类,让我们自己的程序也实现这些参数解析的功能,也可实现自定义参数解析。
- 执行环境准备:需要在本次集群环境基础上程序中调整配置,比如输出压缩、任务优先级等。此外,对于集群中不存在的jar包引用,可以使用DistributedCache分发。
- 任务参数设置:首先任务需要设置一个易于识别的任务名,还需要按照需要设置输入输出格式和目录位置。
- 目录清理:具体任务执行时需要先将输出目录清空,而且为了保证执行成功,一般都是将数据输出到临时目录,执行成功后才用临时目录数据替换输出目录数据。有时候甚至需要先将输出目录数据备份一次后再替换,这些都是必要的数据安全手段。
MR应用程序模板
上述所讲的配置参数都在主入口文件中设置,如下为一个最通用的模板,程序继承Configured,使用GenericOptionsParser完成参数解析。
首先这里,了解下配置参数的生效顺序如下:
- 默认hadoop配置
- -conf 指定配置
- -D 指定配置
- 程序中conf.set设置
因此程序中conf.set中设置时是最终参数,对于常见的输出我们指定Gzip压缩和中间过程压缩减少数据传输量,增加并发;如果任务特别重要,也可conf.set设置任务优先级。
程序读入文件每行为json的单词和权重,要求解析后相同键值合并累加,这一过程依赖fastjson,因此先用DistributedCache分发下fastjson包文件,fastjson包需要提前上传到hdfs。也可以使用-libjars指定分发文件,同样-files、-archives和-libjars指定的文件需要提前上传到hdfs。
public class App extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取和调整配置
Configuration conf = getConf();
//解析参数
// System.out.printf("Input params:\n");
// for (Map.Entry<String, String> entry: conf){
// System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
// }
String[] otherargs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherargs.length != 2 ){
System.err.println("ERROR:Wrong number of parameters:" + args.length);
System.err.println("Usage: hadoop jar xxx.jar <input path> <out path>");
System.exit(-1);
}
String inputs = otherargs[0];
String output = otherargs[1];
String outputReal = output + "/wordcount" ;
String outputTmp = output + "/wordcount_tmp" ;
//执行环境准备
conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
conf.setClass("mapreduce.output.fileoutputformat.compress.codec", GzipCodec.class, CompressionCodec.class);
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
conf.set("mapreduce.job.priority", JobPriority.VERY_HIGH.toString());
DistributedCache.addFileToClassPath(new Path("/home/hdp-mobi/data/extend_jar/fastjson-1.2.30.jar"), conf);
//设置执行参数和内容
Job job = new Job(conf);
job.setJobName("jimwen_wordcount");
job.setJarByClass(App.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPaths(job, inputs);
FileOutputFormat.setOutputPath(job, new Path(outputTmp));
//执行前清理
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(outputTmp))) {
fs.delete(new Path(outputTmp), true);
}
//等待执行完成
boolean success = job.waitForCompletion(true);
//执行后清理
if(success && fs.exists(new Path(outputTmp)){
if(fs.exists(new Path(outputReal))){
fs.delete(new Path(outputReal),true);
}
if (!fs.rename(new Path(outputTmp), new Path(outputReal))) {
System.out.println("ERROR::::Rename fail!!! ");
}
}
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int eCode = ToolRunner.run(new App(), args);
System.exit(eCode);
}
}
程序演示和下载
输入文件内容
{"wenzhou":1,"wenwei":1,"wen":1}
{"wenzhou":1,"wenwei":2}
{"wen":3}
{"wenwei":4}
{"wenwei":1,"wen":3}
执行命令
hadoop jar new_wordcount-1.0-SNAPSHOT.jar -D color=yellow /home/hdp-jiagu/wenzhou/test /home/hdp-jiagu/wenzhou/testout
这里指定输入目录和输出目录,-D color=yellow只是为了演示命令行解析的功能,可以打开解析参数注释,grep color 看到这个命名参数,conf.get(‘color’)可以获取它的值。
输出如下
wen 7
wenwei 8
wenzhou 2
源码下载地址
原创,转载请注明来自