1. Hadoop序列化
1.1 什么是序列化及反序列化?
序列化: 将内存中的对象以字节序列的方式写到文件中
反序列化: 将文件中的字节序列读取到内存,并构造成对应的对象.
1.2 为什么需要有序列化和反序列化?
对象中的数据需要持久化 或者 网路中传输等 ...
1.3 java中如何实现序列化和反序列化?
实现序列化接口: Serializable , 提供序列化标识号: serialVersionUID
对象流: ObjectInputStream ObjectOutputStream
1.4 为什么hadoop不用java的序列化?
对于hadoop来说, Java的序列化框架是重量级的. 一个对象在进行序列化时,除了
要关心对象的数据之外(最重要的) ,还要附带很多额外的数据(继承结构等....)
因为Hadoop本身就是处理大量数据的, 当涉及到要序列化和反序列化时,在能保证
重要数据序列化的前提下,尽可能减少无关紧要的数据. 因此hadoop提供了自己的序列化框架(Writable).
1.5 Writable接口:
write(): 序列化方法
readFields(): 反序列化方法
1.6 一个类实现hadoop序列化的步骤:
1). 实现writable接口
2). 实现write() 和 readFields 两个抽象方法
注意: write写出的属性的顺序(A->B->C) 与 readFields读回来的顺序要一致(A->B->C).
3). 提供无参数构造器(反序列的时候会反射调用无参数构造器来构造对象)
4). 建议重写toString方法.
如果该类的对象会作为最终MR输出的结果来用的话,会调用到该类的对象的toString进行打印.
5). 如果说该类会作为MR过程中的key来使用, 需要实现Comparable接口,并重写compreTo方法.
2. MapReduce的过程:
2.1 数据流
输入数据 ---> InputFormat ---> Mapper ---> shuffle --> Reducer ---> OutputFormat ---> 输出数据
2.2 过程:
简单划分: Map阶段 shuffle阶段 Reduce阶段
2.3 过程
代码角度划分:
1) MapTask类中的 run方法: MapTask : map(67%) sort(33%)
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
2) ReduceTask类中的run方法: ReduceTask : copy sort reduce
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
3) MapReduce的代码角度得到的过程结果为:
map --> sort --> copy --> sort --> reduce
3. InputFormat : 数据输入
3.1 重要的方法:
getSplits(): 生成切片的方法.
createRecordReader(): 创建RecordReader对象,真正负责数据读取的对象.
3.2 重要的子抽象类 FileInputFormat
getSplits(): 做出了具体的实现.
createRecordReader(): 没有做任何的改动.
isSplitable(): 当前输入的数据集是否可切分.
3.3 FileInputFormat 的具体实现类
TextInputFormat : MapReduce默认使用的InputFormat
CombineTextInputFormat
4. 切片
4.1 相关两个概念
块: HDFS存数据的单位. 是把要存储到HDFS的文件以设置好的块的大小,从物理上将文件切成N个块.
切片: MapReduce计算数据的单位. 是把要在MR中计算的数据从逻辑上按照切片的大小,划分成N个切片.
4.2 切片的大小
切片的大小默认情况下等于块的大小.
4.3 切片的源码解读:
FileInputFormat类中的getSplits方法:
1)
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 1
相关配置项: "mapreduce.input.fileinputformat.split.minsize"="0"
long maxSize = getMaxSplitSize(job); // Long.MAX_VALUE
相关配置项: "mapreduce.input.fileinputformat.split.maxsize" 默认没有配置这一项
2)
long blockSize = file.getBlockSize(); //获取文件的块大小
如果是集群环境,获取到的就是集群中设置的块大小,如果是本地环境,本地默认的块大小32M (33554432)
3) 32M 1 Long.MAX_VALUE
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
==>return Math.max(minSize, Math.min(maxSize, blockSize));
4)
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) // SPLIT_SLOP = 1.1
如果剩余待切片的数据除以 块大小,大于1.1 ,才会继续切片,如果不大于,则直接将剩余的数据
生成一个切片.
4.4 切片的总结:
1) 每个切片都需要由一个MapTask来处理 , 也就意味着在一个MR中,有多少个切片,就会有多少个MapTask。
2) 切片的大小默认情况下等于块的大小
3) 切片的时候每个文件单独切片,不会整体切片.
4) 切片的个数不是越多越好,也不是越少越少,按照实际情况,处理成合适的切片数.
5. TextInputFormat : 默认使用的InputFormat类
切片的规则: 用的就是父亲FileInputFormat中的切片规则.
读取数据: LineRecordReader 按行读取数据
6. CombineTextInputFormat
- 用于小文件过多的场景,解决过多的小文件最终生成太多的切片的问题
- 在Driver中设置
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,20971520); //20M - 会按照设置的虚拟存储大小进行输入数据的逻辑上的规划,
如果文件的大小小于MaxInputSplitSize , 则文件规划成一个
如果文件的大小 大于MaxInputSplitSize ,但是小于 MaxInputSplitSize2 , 则文件规划成两个(对半分)
如果文件的大小大于 MaxInputSplitSize2 ,先按照MaxInputSplitSize的大小规划一个, 剩余的再进行规划.
最终按照 MaxInputSplitSize 大小来生成切片,
将规划好的每个虚拟文件逐个累加,只要不超过 MaxInputSplitSize大小,则都是规划到一个切片中的。
7. shuffle关键点:
1). map方法之后, reduce方法之前的处理过程就是shuffle过程.
2). map方法写出去的kv, 会被一个收集线程收集到缓冲区中.
3). 缓冲区的大小默认是100M,达到80%发生溢写.
4). 缓冲区中记录的是 kv 、 kv的下标 、 kv的分区 等.
5). 溢写的时候,是按照kv的分区进行排序(快排,只排索引) ,再按照分区溢写. -->map端的第一次排序
6). 每个MapTask有可能发生多次溢写,最终需要将多次溢写的文件归并成一个大的文件. --> map端的第二次排序
7). 在溢写 和归并过程中,都可以采用combiner。
8). 每个ReduceTask按照所要处理的分区, 到每个MapTask中拷贝对应的分区的数据.
拷贝过程中,先放内存,放不下写磁盘. 等数据全部都拷贝过来后,进行归并排序。 --> reduce端的排序
9). reduce端排好序的数据进行分组,然后进入reduce方法进行业务处理.
8. 分区
8.1 分区的概念:
将数据按照条件输出到多个文件中.
8.2 为什么设置reduce的个数可以实现分区的效果
1) 在MapTask类中的NewOutputCollector()方法中:
partitions = jobContext.getNumReduceTasks(); // 获取reduce的个数, reduce的个数在driver中设置的
if (partitions > 1) {
//如果说reducce的个数大于1, 会尝试获取一个分区器类,通过mapreduce.job.partitioner.class参数获取 ,
// 默认mapreduce.job.partitioner.class没有配置,则直接返回HashPartitioner.class 。
// 当然还有一种可能就是我们自己设置过分区器类,则此处会获取到我们自己设置的分区器类。
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
//如果reduce的个数不大于1, 最终的分区号就是固定的0号分区。
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
2) 获取到的分区器在哪里使用?
在MapTask的中的NewOutputCollector内部类中的write方法:
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
//将kv收集到缓冲区的时候,要计算出来kv对应的分区号
}
8.3 分区的数据是如何分的?
1). 数据的分区由分区器(Partitioner)来决定.
2). Hadoop有默认的分区器对象 HashPartitioner .
HashPartitioner会按照k的hash值对Reduce的个数进行取余操作得到k所对应的分区.
3) hadoop也支持用户自定义分区器
8.4 默认的分区器 HashPartitioner
1). 默认的分区器就会按照key的hashcode值 先对Integer的最大值做&运算,在对reduce的个数取余,得到分区号。
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
8.5 自定义分区器
重点:
-
Hadoop序列化
-
InputFormat
切片源码 FileInputFormat–>getSplits()
TextInputFormat
CombineTextInputFormat测试 -
熟悉Shuffle的流程
-
分区
测试分区的效果 通过设置reduce个数
熟悉分区的原理
默认分区器 HashPartitioner