文章目录
一、MapReduce概述
1.1 MapReduce 定义
-
MapReduce 是一个分布式运算程序的编程框架,是用户开发基于Hadoop的数据分析应用的核心框架。
-
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1.2 MapReduce 优缺点
1.2.1 优点
(1)MapReduce 易于编程
- 它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行
(2)良好的扩展性
- 当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性
- MapReduce 设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
(4)适合 PB 级别以上海量数据的离线处理
- 可以实现上千台服务器集群并发工作,提供数据处理能力。
1.2.1 缺点
(1)不擅长实时计算
- MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
(2)不擅长流式计算
- 流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
(3)不擅长DAG(有向无环图)计算
- 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.3 MapReduce 核心思想
(1)分布式的运算程序往往需要分成至少2个阶段。
(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
(4)MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
总结:分析 WordCount 数据流走向深入理解 MapReduce 核心思想。
1.4 MapReduce 进程
-
一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。
1.5 官方 WordCount 源码
- 采用反编译工具反编译源码,发现 WordCount 案例有 Map 类、Reduce 类和 驱动(Driver)类。且数据的类型是 Hadoop 自身封装的序列化类型。
1.6 常用数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
Null | NullWritable |
1.7 MapReduce 编程规范
- 用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。
Mapper 阶段
① 用户自定义的 Mapper 要继承自己的父类(Mapper);
② Mapper 的输入数据是 KV 键值对的形式(KV类型可自定义);
③ Mapper 中的业务逻辑写在 map() 方法中;
④ Mapper 的输出数据是 KV 键值对的形式(KV类型可自定义);
⑤ map() 方法(MapTask进程)对每一个<K,V>调用一次
Reduce 阶段
① 用户自定义的 Reducer 要继承自己的父类(Reducer);
② Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是KV;
③ Mapper 中的业务逻辑写在 reduce() 方法中;
④ ReduceTask 进程对每一组相同 K 的 <K,V> 组调用一次 reduce() 方法;
Driver 阶段
- 相当于 YARN 集群的客户端,用于提交我们整个程序到 YARN 集群,提交的是封装了 MapReduce 程序相关运行参数的 job 对象
1.8 WordCount 案例练习
(1)需求
在给定的文本文件中统计输出每一个单词出现的总次数
输入数据:hello.txt
-
java java hadoop hadoop shell shell idea idea idea xiao datenode namenode hdfs yarn mapreduce
期望输出数据:
-
datenode 1 hadoop 2 hdfs 1 idea 3 java 2 mapreduce 1 namenode 1 shell 2 xiao 1 yarn 1
(2)需求分析
- 按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver。
(3)环境准备
① 创建 Maven 工程
② 在在pom.xml文件中添加如下依赖
-
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> </dependencies>
③ 在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。
-
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="error" strict="true" name="XMLConfig"> <Appenders> <!-- 类型名为Console,名称为必须属性 --> <Appender type="Console" name="STDOUT"> <!-- 布局为PatternLayout的方式, 输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --> <Layout type="PatternLayout" pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /> </Appender> </Appenders> <Loggers> <!-- 可加性为false --> <Logger name="test" level="info" additivity="false"> <AppenderRef ref="STDOUT" /> </Logger> <!-- root loggerConfig设置 --> <Root level="info"> <AppenderRef ref="STDOUT" /> </Root> </Loggers> </Configuration>
(4)编写程序
- 编写Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
//设置需要输出的键值对类型
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);//指定键值对的value值为1,后面方便计算
//重写map方法 快捷键(Ctrl + o)
//参数解读:1、偏移量 2、读取输入的一行数据(atguigu atguigu) 3、上下文对象
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行数据,并将其转换为字符串类型
String line = value.toString();
//2、将读取到的单词字符串进行切割,按照空格切割
String[] words = line.split(" ");
//3、写出
for (String word : words) {
//封装outKey,outValue
outKey.set(word);
//输出
context.write(outKey,outValue);
}
}
}
- 编写Reducer类
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//设置输出的value的类型
private IntWritable outValue = new IntWritable();
//重写reduce方法
//参数解读:1、单词 2、相同单词的一组数据 3、上下文对象
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//遍历单词进行统计计数
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
sum += i;
}
//封装outValue
outValue.set(sum);
//输出
context.write(key,outValue);
}
}
- 编写Driver驱动类
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取配置信息,获取job对象实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、指定本程序的jar包所在的本地路径(设置本地Driver对象)
job.setJarByClass(WordCountDriver.class);
//3、关联 mapper 和 reducer 业务类对象
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置 mapper 端的输出数据的 Key,Value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置 MapReduce 程序的最终输出数据的 Key,Value 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置程序(job)的输入输出路径
FileInputFormat.setInputPaths(job, new Path("F:\\input\\inputword\\hello.txt"));
FileOutputFormat.setOutputPath(job, new Path("F:\\input\\MapReduce\\output3"));
//7、提交job作业
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
(5)本地测试
(1)需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖
(2)在 IDEA/Eclipse 上运行程序
(6)集群上测试
(1)用maven打jar包,需要添加的打包插件依赖
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意:如果工程上显示红叉。在项目上右键->maven->Reimport即可
(1)将程序打成jar包,然后拷贝到Hadoop集群中
步骤详情:右键->Run as->maven install。等待编译完成就会在项目的target文件夹中生成jar包。如果看不到。在项目上右键->Refresh,即可看到。修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群。
(2)启动Hadoop集群
(3)执行WordCount程序
[xiaoxq@hadoop105 software]$ hadoop jar wc.jar
com.xiaoxq.wordcount.WordcountDriver /user/xiaoxq/input /user/xiaoxq/output