任务目的
- 理解 MapReduce 的概念
- 掌握 MapReduce 程序运行的步骤
- 掌握 MapReduce 编程规范
任务清单
- 任务1:MapReduce概述
- 任务2:MapReduce 程序运行演示
- 任务3:MapReduce 示例编写规范
详细任务步骤
任务1:MapReduce概述
1. 什么是MapReduce?
重温 Hadoop 四大组件:
- HDFS:分布式文件系统
- MapReduce:分布式运算编程框架
- YARN: Hadoop 的资源调度系统
- Common: 以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
2. 为什么需要 MapReduce?
(1)海量数据在单机上处理因为硬件资源限制,无法胜任;
(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度;
(3)引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
程序由单机版扩成分布式版时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。
Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架,它把大量分布式程序都会涉及到的内容都封装起来,让用户只用专注自己的业务逻辑代码的开发。
任务2:MapReduce 程序运行演示
Hadoop 的发布包中内置了一个 hadoop-mapreduce-examples-2.7.7.jar, 这个 jar 包中有各种 MapReduce 示 例程序,其中非常有名的就是 PI 程序 和 wordcount。此 jar 包存放在 $HADOOP_HOME/share/hadoop/mapreduce/
目录里。
我们可以通过以下步骤运行:
- 启动 HDFS 和 YARN 集群
- 然后在集群的任意一台节点上启动执行程序
2.1 PI 程序
进入 $HADOOP_HOME/share/hadoop/mapreduce/
目录下,执行如下命令:
hadoop jar hadoop-mapreduce-examples-2.7.7.jar pi 10 10
如下图所示:
图1
运行结果:
图2
2.2 wordcount 程序
将$HADOOP_HOME/README.txt
文件上传到 HDFS 作为数据源:
hadoop fs -put README.txt /
运行结果:
图3
执行 wordcount 程序:
hadoop jar hadoop-mapreduce-examples-2.7.7.jar wordcount /README.txt /wordcount
如下图所示:
图4
执行完成,使用cat
命令查看运行结果:
图5
2.3 源码解析
打开“/root/software/hadoop-2.7.7-src/hadoop-mapreduce-project/hadoop-mapreduce-examples
”,如下图所示:
图6
使用vi编辑器打开“pom.xml”,找到第 127 行,它告诉了我们例子程序的主程序入口:
图7
进入该目录“src/main/java/org/apache/hadoop/examples
”,如下图所示:
图8
打开主入口程序“ExampleDriver.java”,告诉我们pi和wordcount对应的实际程序分别是QuasiMonteCarlo.class和WordCount.class:
图9
之后打开pi和wordcount对应的实际程序,查看MapReduce示例的源码编写规范,这里我们着重看一下wordcount的源码编写:
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
通过查看 WordCount 程序 MapReduce 源码编写,得出以下几点结论:
该程序有一个 main 方法,来启动任务的运行,其中 job 对象存储了该程序运行的必要信息,比如指定 Mapper 类和 Reducer 类:
job.setMapperClass(TokenizerMapper.class);
继承 Mapper 类job.setReducerClass(IntSumReducer.class);
继承 Redcuer 类
总结:
MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分编写该 MapReduce 程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继承 Mapper 类和 Reducer 类。
任务3:MapReduce 示例编写规范
1. 用户编写的程序分成三个部分: Mapper, Reducer, Driver
2. Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
3. Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
4. Mapper 中的业务逻辑写在 map() 方法中
5. MapTask 进程对每一个<K,V>
调用一次map()方法
6. Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
7. Reducer 的输出数据是 KV 对的形式(KV 的类型可自定义)
8. Reducer 的业务逻辑写在 reduce() 方法中
9. ReduceTask 进程对每一组相同 k 的<k,v>
组调用一次 reduce() 方法
10. 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
11. 整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象