任务目的
- 掌握 MapReduce 程序运行模式
- 理解 Combiner 的作用和使用方式
任务清单
- 任务1:MapReduce 程序运行模式
- 任务2:MapReduce 三大组件(一):Combiner
详细任务步骤
任务1:MapReduce 程序运行模式
1. 本地运行模式(eclipse 开发环境下本地运行, 好处是方便调试和测试)
- 要点一: MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行
- 要点二: 数据输入输出可以在本地,也可以在 HDFS
- 要点三: 怎么实现本地运行?
- 在你的 MapReduce 程序当中不要带集群的配置文件(本质就是由mapreduce.framework.name
和yarn.resourcemanager.hostname
这两个参数决定)
刚才有提到在本地运行模式下,输入输出的数据可以在本地,也可以在 HDFS 上。
- 若是在本地,由以下2个参数决定:
conf.set("mapreduce.framework.name", "local");// 指定MapReduce运行时框架为本地作业运行器
conf.set("fs.defaultFS", "file:///"); // 获取本地文件系统实例
- 若是在HDFS上,则由以下2个参数决定:
conf.set("mapreduce.framework.name", "local");
conf.set("fs.defaultFS", "hdfs://localhost:9000"); // HDFS集群中NameNode的URI,获取DistributedFileSystem实例
2. 集群运行模式(打 jar 包,提交任务到集群运行)
将 MapReduce 程序提交给 YARN 集群 ResourceManager,分发到多个节点上并发执行。处理的输入数据和输出结果都应位于 HDFS 文件系统。
- 要点一:首先要把代码打成 jar 包放在 Linux 本地
- 要点二:用
hadoop jar
的命令去提交代码到 YARN 集群运行 - 要点三:处理的输入数据和输出结果都应位于 HDFS 文件系统
集群运行模式需要配置以下参数:
conf.set("mapreduce.framework.name", "yarn");// 指定MapReduce运行时框架为YARN
conf.set("yarn.resourcemanager.hostname", "localhost"); // 指定 ResourceManager 守护进程所在主机
conf.set("fs.defaultFS", "hdfs://localhost:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例
具体打 jar 包运行的步骤如下所示:
(1)配置完以上参数后,右键项目MyMR,选择“Export”:
图1
(2)选择“Java”——》“JAR file”,之后点击“Next”:
图2
(3)在弹出的对话框中,勾选MyMR项目中的 src,然后在JAR file中选择保存的路径名(包含最终的jar包的名字),之后点击“Finish”:
图3
(4)将Hadoop安装包中自带的README.txt文件上传到HDFS的/wordcount/input目录下,将其作为数据源。进入jar包所在的目录,使用以下命令运行jar包:
hadoop jar wordcount.jar com.hongyaa.mr.wordcount
解析:
hadoop jar
:用来执行一个Hadoop的jar包程序- wordcount.jar:执行的jar包的名字
- com.hongyaa.mr.wordcount:包名.类名,此处的类名为我们主类的名字
执行过程示意图:
图4
(5)在本机的浏览器上访问http://localhost:8088进入YARN集群的Web UI界面,在此界面中查看该Job是否成功提交到YARN集群。
图5
从上图可以看出该Job成功提交给YARN集群运行,并且运行成功。
(6)使用 HDFS Shell操作查看运行结果,运行结果储存在HDFS的/wordcount/output目录下:
hadoop fs -cat /wordcount/output/part-r-00000
运行结果如下所示:
图6
任务2:MapReduce 三大组件(一):Combiner
2.1 什么是 Combiner?
Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 MapTask 之后给 MapTask 的结果进行局部汇总,以减轻 ReduceTask 的计算负载,减少网络传输。
Combiner 最基本的是实现本地key的聚合,对 Map 输出的 key 排序,value 进行迭代,有本地 Reduce 之称 ,实际上就是继承 Reducer 类,本质上就是一个 Reducer。
2.2 数据格式的转换
map: (K1, V1) → list(K2, V2)
combiner: (K2, list(V2)) → list(K3, V3)
reduce: (K3, list(V3)) → list(K4, V4)
注:现在想想,如果在 WordCount 中不用 Combiner,那么所有的结果都是 reduce 完成,效率会相对低下。使用 Combiner 之后,先完成的 Map 会在本地聚合,提升速度。对于 WordCount 的例子,value 就是一个叠加的数字,所以 Map 一结束就可以进行 Reduce 的 value 叠加,而不必要等到所有的 Map 结束再去进行 Reduce 的 value 叠加。
2.3 添加设置 Combiner 的代码(以我们自己写的 wordCount 为例)
Combiner 的意义就是对每一个 MapTask 的输出进行局部汇总,以减小网络传输量。
具体实现步骤:
(1)自定义一个 Combiner 继承 Reducer,重写 reduce 方法
(2)在 Job 中设置:job.setCombinerClass(xxx.class)
具体代码实现:
(1)Combiner 的代码(和 Reduce 端的代码一致,本质上就是一个 Reducer)
/**
* Combiner
*/
public static class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v : values) {
sum += v.get();
}
context.write(key, new IntWritable(sum));
}
}
(2)在 Job 中设置 Combiner 类
job.setCombinerClass(WordCountCombiner.class);
**说明: **
- “词频统计”是一个可以展示 Combiner 用处的基础例子,上面的 WordCount 程序为每一行数据生成了一个(word,1)键值对。
- 所以如果在同一个文档内“Car”出现了3次,("Car",1)键值对会被生成3次,这些键值对会被送到 Reduce 端。
- 通过使用 Combiner,这些键值对可以被压缩为一个("Car",3)送往 Reduce 端。现在每一个节点针对每一个词只会发送一个值到 Reduce 端,大大减少了 Shuffle 过程所需要的带宽并加速了作业的执行。
- 这里面最棒的就是我们不用写任何额外的代码就可以享用此功能!如果你的 Reduce 是可交换及可组合的,那么它也就可以作为一个 Combiner。你只要在 Driver 中添加
job.setCombinerClass(xxx.class);
这行代码就可以在“词频统计”程序中启用Combiner。
2.4 使用 Combiner 的注意事项
(1)Combiner 和 Reducer 的区别在于运行的位置:
- Combiner 是在每一个 MapTask 所在的节点运行
- Reducer 是接收全局所有 Mapper 的输出结果
(2)Combiner 的输出 KV 跟 Reducer 的输入 KV 类型相对应。
(3)不要以为在写 MapReduce 程序时设置了 Combiner 就认为 Combiner 一定会起作用,实际情况是这样的吗?答案是否定的。
- Hadoop 文档中也有说明 Combiner 可能被执行也可能不被执行,可能执行一次也可能执行多次。
- **那么在什么情况下不执行呢?**如果当前集群在很繁忙的情况下 Job 就是设置了也不会执行 Combiner,这时集群本身负载量很大,会尽量提早执行完 Map,空出资源。所以, Combiner 使用的原则是:有或没有都不能影响业务逻辑,都不能影响最终结果。
(4)Combiner的适用场景比如说在**“汇总统计”、”求最大值“**时,就可以使用 Combiner,但是在“求平均数”的时候就不适用了。