以WordCount为例,研究一下MapReduce的内部工作机制。
统计某个输入目录下所有文件中每个单词以及出现的次数。
map阶段:读取数据每一行,切分数据,单词作为key,次数1作为value,输出到context中。
reduce阶段:接受来自map阶段的输出,按照相同key来聚合分组,每一组执行一次reduce方法,累加,将结果写入到context中。
最终MapReduce运行过程如下所示:
首先客户端设定Job对象参数,根据输入切片大小以及输入目录来划分输入切片FileSplit,具体来说每个文件最少一个切片,如果大于切片大小则继续划分,因此输入文件决定了输入切片的多少,进而决定了mapTask的数量。
每个切片由一个mapTask来负责执行
由FileInputFormat来决定读取切片数据方式:默认为TextInputFormat。
通过控制FileInputFormat可以控制读取输入切片的格式
TextInputFormat中getRecordReader方法返回LineRecordReader
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
查看一下LineRecordReader,其中next方法
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos);
int newSize = 0;
if (pos == 0) {
newSize = skipUtfByteOrderMark(value);
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
return false;
}
由上述代码可知,next方法会将每一行的起始偏移量赋值给key 每一行的内容赋值给value。然后将k,v输入到map方法中,在map方法中执行处理逻辑,将key与value写入到context中。
MapOutputCollector负责将context中内容序列化到环形缓冲区中,从某一位置开始写入,当写满大约环形缓冲区80%时,会启动Spiller来读取已经写入的部分数据,按照Partition中getPartition方法来决定每个key-value所在的分区,并且按照key compareTo方法来排序。将结果存储到本地文件--溢出文件。溢出文件中key-value是分区排序存储的。
控制Partition方法可以控制key-value所在的分区
控制key compareTo方法可以控制key-value排序顺序
得到多个溢出文件,可以使用Combiner 来局部聚合合并Merge,最终得到分区索引文件,保存整个mapTask处理的数据时分区有序的,将结果保存在NodeManager web Document目录中,以供reduceTask来下载。
reduceTask启动,假如其为redcueTask0,则reduceTask会通过http协议来下载所有0号分区文件到本地。
将下载的所有分区文件执行一次合并排序,排序顺序按照key compareTo方法,最终合并成一个文件,按照key有序。
可以通过控制key compareTo方法来决定某个key-value在某个reduceTask输入数据中的位置
然后调用reduceTask中reduce方法,读取合并文件的每一项key-value,输入到reduce方法中,迭代器每迭代一次,key值改变,value值改变,每迭代一次调用分组比较器GroupingComparator来决定是否属于同一组kv,若不属于,则继续进行下一组迭代。
通过控制分组比较器可以控制reduceTask分组机制,即哪些数据作为一组来调用reduce方法。
执行相应逻辑,结果写入到context中,此时调用TextOutputFormat中getRecordWriter方法返回LineRecordWriter将key-value写入到目标文件中HDFS,即完成最终结果。
shuffle过程: mapTask产生的数据传输给reduceTask过程