7.4 WordCount示例编写(三)

任务目的

  • 掌握 MapReduce 程序运行模式
  • 理解 Combiner 的作用和使用方式

任务清单

  • 任务1:MapReduce 程序运行模式
  • 任务2:MapReduce 三大组件(一):Combiner

详细任务步骤

任务1:MapReduce 程序运行模式

  1. 本地运行模式(eclipse 开发环境下本地运行, 好处是方便调试和测试)

  • 要点一: MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行
  • 要点二: 数据输入输出可以在本地,也可以在 HDFS
  • 要点三: 怎么实现本地运行?
    - 在你的 MapReduce 程序当中不要带集群的配置文件(本质就是由 mapreduce.framework.name 和 yarn.resourcemanager.hostname 这两个参数决定)

  刚才有提到在本地运行模式下,输入输出的数据可以在本地,也可以在 HDFS 上。

  • 若是在本地,由以下2个参数决定:
conf.set("mapreduce.framework.name", "local");// 指定MapReduce运行时框架为本地作业运行器 
conf.set("fs.defaultFS", "file:///");  // 获取本地文件系统实例
  • 若是在HDFS上,则由以下2个参数决定:
conf.set("mapreduce.framework.name", "local");  
conf.set("fs.defaultFS", "hdfs://localhost:9000"); // HDFS集群中NameNode的URI,获取DistributedFileSystem实例

  2. 集群运行模式(打 jar 包,提交任务到集群运行)

  将 MapReduce 程序提交给 YARN 集群 ResourceManager,分发到多个节点上并发执行。处理的输入数据和输出结果都应位于 HDFS 文件系统。

  • 要点一:首先要把代码打成 jar 包放在 Linux 本地
  • 要点二:用 hadoop jar 的命令去提交代码到 YARN 集群运行
  • 要点三:处理的输入数据和输出结果都应位于 HDFS 文件系统

  集群运行模式需要配置以下参数:

conf.set("mapreduce.framework.name", "yarn");// 指定MapReduce运行时框架为YARN 
conf.set("yarn.resourcemanager.hostname", "localhost"); // 指定 ResourceManager 守护进程所在主机
conf.set("fs.defaultFS", "hdfs://localhost:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例

  具体打 jar 包运行的步骤如下所示:
  (1)配置完以上参数后,右键项目MyMR,选择“Export”:

Vditor

图1

 

  (2)选择“Java”——》“JAR file”,之后点击“Next”:

Vditor

图2

 

  (3)在弹出的对话框中,勾选MyMR项目中的 src,然后在JAR file中选择保存的路径名(包含最终的jar包的名字),之后点击“Finish”:

Vditor

图3

 

  (4)将Hadoop安装包中自带的README.txt文件上传到HDFS的/wordcount/input目录下,将其作为数据源。进入jar包所在的目录,使用以下命令运行jar包:

hadoop jar wordcount.jar com.hongyaa.mr.wordcount

  解析:

  • hadoop jar:用来执行一个Hadoop的jar包程序
  • wordcount.jar:执行的jar包的名字
  • com.hongyaa.mr.wordcount:包名.类名,此处的类名为我们主类的名字

  执行过程示意图:

Vditor

图4

 

  (5)在本机的浏览器上访问http://localhost:8088进入YARN集群的Web UI界面,在此界面中查看该Job是否成功提交到YARN集群。

Vditor

图5

 

  从上图可以看出该Job成功提交给YARN集群运行,并且运行成功。

  (6)使用 HDFS Shell操作查看运行结果,运行结果储存在HDFS的/wordcount/output目录下:

hadoop fs -cat /wordcount/output/part-r-00000

  运行结果如下所示:

Vditor

图6

 

任务2:MapReduce 三大组件(一):Combiner

2.1 什么是 Combiner?

  Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 MapTask 之后给 MapTask 的结果进行局部汇总,以减轻 ReduceTask 的计算负载,减少网络传输。

  Combiner 最基本的是实现本地key的聚合,对 Map 输出的 key 排序,value 进行迭代,有本地 Reduce 之称 ,实际上就是继承 Reducer 类,本质上就是一个 Reducer。

2.2 数据格式的转换

map: (K1, V1) → list(K2, V2) 
combiner: (K2, list(V2)) → list(K3, V3) 
reduce: (K3, list(V3)) → list(K4, V4)

  注:现在想想,如果在 WordCount 中不用 Combiner,那么所有的结果都是 reduce 完成,效率会相对低下。使用 Combiner 之后,先完成的 Map 会在本地聚合,提升速度。对于 WordCount 的例子,value 就是一个叠加的数字,所以 Map 一结束就可以进行 Reduce 的 value 叠加,而不必要等到所有的 Map 结束再去进行 Reduce 的 value 叠加。

2.3 添加设置 Combiner 的代码(以我们自己写的 wordCount 为例)

  Combiner 的意义就是对每一个 MapTask 的输出进行局部汇总,以减小网络传输量。

  具体实现步骤:

  (1)自定义一个 Combiner 继承 Reducer,重写 reduce 方法

  (2)在 Job 中设置:job.setCombinerClass(xxx.class)

  具体代码实现:

  (1)Combiner 的代码(和 Reduce 端的代码一致,本质上就是一个 Reducer)

 	/**
	 * Combiner
	 */
	public static class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable v : values) {
				sum += v.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}

  (2)在 Job 中设置 Combiner 类

job.setCombinerClass(WordCountCombiner.class);

  **说明: **

  • “词频统计”是一个可以展示 Combiner 用处的基础例子,上面的 WordCount 程序为每一行数据生成了一个(word,1)键值对。
  • 所以如果在同一个文档内“Car”出现了3次,("Car",1)键值对会被生成3次,这些键值对会被送到 Reduce 端。
  • 通过使用 Combiner,这些键值对可以被压缩为一个("Car",3)送往 Reduce 端。现在每一个节点针对每一个词只会发送一个值到 Reduce 端,大大减少了 Shuffle 过程所需要的带宽并加速了作业的执行。
  • 这里面最棒的就是我们不用写任何额外的代码就可以享用此功能!如果你的 Reduce 是可交换及可组合的,那么它也就可以作为一个 Combiner。你只要在 Driver 中添加job.setCombinerClass(xxx.class);这行代码就可以在“词频统计”程序中启用Combiner。

2.4 使用 Combiner 的注意事项

  (1)Combiner 和 Reducer 的区别在于运行的位置

  • Combiner 是在每一个 MapTask 所在的节点运行
  • Reducer 是接收全局所有 Mapper 的输出结果

  (2)Combiner 的输出 KV 跟 Reducer 的输入 KV 类型相对应。

  (3)不要以为在写 MapReduce 程序时设置了 Combiner 就认为 Combiner 一定会起作用,实际情况是这样的吗?答案是否定的

  • Hadoop 文档中也有说明 Combiner 可能被执行也可能不被执行,可能执行一次也可能执行多次。
  • **那么在什么情况下不执行呢?**如果当前集群在很繁忙的情况下 Job 就是设置了也不会执行 Combiner,这时集群本身负载量很大,会尽量提早执行完 Map,空出资源。所以, Combiner 使用的原则是:有或没有都不能影响业务逻辑,都不能影响最终结果

  (4)Combiner的适用场景比如说在**“汇总统计””求最大值“**时,就可以使用 Combiner,但是在“求平均数”的时候就不适用了。

猜你喜欢

转载自blog.csdn.net/c_lanxiaofang/article/details/107836966
7.4