文章目录
1. MapReduce介绍
1.1 MapReduce的基本思想
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。
MapReduce分为Map和Reduce两个阶段,Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理,这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇总。MapReduce运行在yarn集群
下图用介绍了类似MapReduce思想的一个实现过程
在实际的MapReduce执行过程中,可能有多个Reduce
1.2 MapReduce的设计
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce,MapReduce处理的数据类型是**<key,value>键值对**。
- Map:
(k1; v1) → [(k2; v2)]
- Reduce:
(k2; [v2]) → [(k3; v3)]
1.3 MapReduce在yarn集群上的执行流程
-
MRAppMaster 负责整个程序的过程调度及状态协调
-
MapTask负责map阶段的整个数据处理流程
-
ReduceTask负责reduce阶段的整个数据处理流程
2. MapReduce运行机制详解
2.1 Map阶段
Map阶段又可以分为Map Task阶段和Map shuffle阶段,如下图所示
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zqLW66Jf-1581582983148)(7\1581407538401.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WL9H2P5l-1581582983149)(7\1581407566690.png)]
-
读取数据组件 InputFormat (默认 TextInputFormat) 会通过
getSplits
方法对输入目录中文件进行逻辑切片规划得到 block, 有多少个block
就对应启动多少个MapTask
. -
将输入文件切分为
block
之后, 由 RecordReader对象 (默认是LineRecordReader) 进行读取, 以\n
作为分隔符, 读取一行数据, 返回<key,value>
. Key 表示每行首字符偏移值, Value 表示这一行文本内容(K1,V1) -
读取
block
返回<key,value>
, 进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数, RecordReader 读取一行这里调用一次 -
Mapper 逻辑结束之后, 将 Mapper 的每条结果通过 context.write进行collect数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 HashPartitioner
-
MapReduce 提供
Partitioner
接口, 它的作用就是根据Key
或Value
及Reducer
的数量来决定当前的这对输出数据最终应该交由哪个Reduce task
处理, 默认对 Key Hash 后再以 Reducer 数量取模. 默认的取模方式只是为了平均 Reducer 的处理能力, 如果用户自己对 Partitioner 有需求, 可以订制并设置到 Job 上
-
-
接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区(临时数据收集), 缓冲区的作用是批量收集 Mapper 结果, 减少磁盘 IO 的影响. 我们的 Key/Value 对以及 Partition 的结果都会被写入缓冲区. 当然, 写入之前,Key 与 Value 值都会被序列化成字节数组
-
环形缓冲区其实是一个数组, 数组中存放着 Key, Value 的序列化数据和 Key, Value 的元数据信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value 的长度. 环形结构是一个抽象概念
-
缓冲区是有大小限制, 默认是 100MB. 当 Mapper 的输出结果很多时, 就可能会撑爆内存, 所以需要在一定条件下将缓冲区中的数据临时写入磁盘, 然后重新利用这块缓冲区. 这个从内存往磁盘写数据的过程被称为 Spill, 中文可译为溢写. 这个溢写是由单独线程来完成, 不影响往缓冲区写 Mapper 结果的线程. 溢写线程启动时不应该阻止 Mapper 的结果输出, 所以整个缓冲区有个溢写的比例
spill.percent
. 这个比例默认是 0.8, 也就是当缓冲区的数据已经达到阈值buffer size * spill percent = 100MB * 0.8 = 80MB
, 溢写线程启动, 锁定这 80MB 的内存, 执行溢写过程. Mapper 的输出结果还可以往剩下的 20MB 内存中写, 互不影响
-
-
当溢写线程启动后, 需要对这 80MB 空间内的 Key 做排序 (Sort). 排序是 MapReduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序
-
如果 Job 设置过 Combiner, 那么现在就是使用 Combiner 的时候了. 将有相同 Key 的 Key/Value 对的 Value 加起来, 减少溢写到磁盘的数据量. Combiner 会优化 MapReduce 的中间结果, 所以它在整个模型中会多次使用
-
那哪些场景才能使用 Combiner 呢? 从这里分析, Combiner 的输出是 Reducer 的输入, Combiner 绝不能改变最终的计算结果. Combiner 只应该用于那种 Reduce 的输入 Key/Value 与输出 Key/Value 类型完全一致, 且不影响最终结果的场景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它对 Job 执行效率有帮助, 反之会影响 Reducer 的最终结果
-
-
合并溢写文件, 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner), 如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个reduce对应数据的偏移量
配置hadoop时可以对溢写进行配置,在hadoop/mapre-site.xml
中进行配置
配置 | 默认值 | 解释 |
---|---|---|
mapreduce.task.io.sort.mb |
100 | 设置环型缓冲区的内存值大小 |
mapreduce.map.sort.spill.percent |
0.8 | 设置溢写的比例 |
mapreduce.cluster.local.dir |
${hadoop.tmp.dir}/mapred/local |
溢写数据目录 |
mapreduce.task.io.sort.factor |
10 | 设置一次合并多少个溢写文件 |
2.1 Reduce阶段
Reduce阶段分为Reduce shuffle阶段和Reduce Task阶段,如下图所示
- Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
- Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
- 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
- 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
3. MapReduce编程规范
MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为 2 个步骤
Map 阶段 2 个步骤
- 设置 InputFormat 类, 将数据切分为 Key-Value**(K1和V1)** 对, 输入到第二步
- 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果
Shuffle 阶段 4 个步骤
- 对输出的 Key-Value 对进行分区
- 对不同分区的数据按照相同的 Key 排序
- 对数据进行分组, 相同 Key 的 Value 放入一个集合中
- (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
-
在 MapReduce 中, 通过我们指定分区, 会将同一个分区的数据发送到同一个 Reduce 当中进行处理Reduce默认分区只有一个,但是在实际应用中可能有多个结果文件输出,因此往往需要重新设置分区,分区的简单流程如下图所示
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XmzoAfEU-1581582983155)(7\1581044894242.png)]
-
规约(Combiner) 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一
- combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
- combiner 组件的父类就是 Reducer
- combiner 和 reducer 的区别在于运行的位置
- Combiner 是在每一个 maptask 所在的节点运行
- Reducer 是接收全局所有 Mapper 的输出结果
- combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
Reduce 阶段 2 个步骤
- 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
- 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
5. MapReduce中的计数器
计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
- hadoop内置计数器列表
MapReduce任务计数器 | org.apache.hadoop.mapreduce.TaskCounter |
---|---|
文件系统计数器 | org.apache.hadoop.mapreduce.FileSystemCounter |
FileInputFormat计数器 | org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
FileOutputFormat计数器 | org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
作业计数器 | org.apache.hadoop.mapreduce.JobCounter |
每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中包括一些重要的日志记录
-
除内置计数器之外,我们还可以自定义计数器
-
通过context上下文对象,在map端使用计数器进行统计
public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable>{ //map方法将K1和V1转为K2和V2 @Override protected void map(LongWritable key, Text value, Context context) throws Exception{ Counter counter = context.getCounter("MR_COUNT", "MyRecordCounter"); counter.increment(1L); context.write(value,NullWritable.get()); } }
-
通过enum枚举类型来定义计数器,例如统计reduce端数据的输入的key有多少个
public class PartitionerReducer extends Reducer<Text,NullWritable,Text,NullWritable> { public static enum Counter{ MY_REDUCE_INPUT_RECORDS,MY_REDUCE_INPUT_BYTES } @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.getCounter(Counter.MY_REDUCE_INPUT_RECORDS).increment(1L); context.write(key, NullWritable.get()); } }
-
-
序列化 (Serialization) 是指把结构化对象转化为字节流
-
反序列化 (Deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取的字节流转换为对象, 就要进行反序列化
-
Java 的序列化 (Serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, Hadoop 自己开发了一套序列化机制(Writable), 精简高效. 不用像 Java 对象类一样传输多层的父子关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销
-
Writable 是 Hadoop 的序列化接口. 一个类要支持可序列化只需实现这个接口即可
-
另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能