一.上文
http://zy19982004.iteye.com/blog/2037549从整体上描述了Job执行的过程,大致分为三步
- 准备数据
- map reduce
- 清理
其中最主要的当然是map reduce的过程,map由MapTask主导完成,reduce由ReduceTask主导完成。先看看官方给的一个图
二.MapTask
MapTask分为以下几步
- Read:从InputSplit中读取数据,解析出一个个key/value。
- Map:由自定义的Mapper类处理上述key/value。处理结果也是key/value形式。
- Write或者叫Collect:MapContext将处理结果写入环形内存缓冲区kvbuffer。
- Combine(如果有Combine的话) & SortAndSpill:当MapTask的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spillper。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(kvbuffer.length * spillper = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。可以参考类MapTask init方法
final float spillper =job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); int maxMemUsage = sortmb << 20;//100M kvbuffer = new byte[maxMemUsage]; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;
另外补充一点,如果有Combine,在这一步也会起作用,参考sortAndSpill方法//如果combiner为空,则直接写入文件 if (null == combinerClass) { …… writer.append(key, value); ++spindex; } else { …… //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件 combineAndSpill(kvIter, combineInputCounter); }
在http://zy19982004.iteye.com/blog/2037549的基础上补充一张图,这个图就是spill过程。 - Combine(如果有Combine的话) & Merge:当一个MapTask即将完成时,可能生成多个spill文件,需要将这些spill文件进行合并,生成一个file.out文件。可以参考MapTask mergeParts方法。
三.ReduceTask
ReduceTask分为以下几步
- Copy(也称Shuffle):如果是Local模式,直接mv & rename;如果是集群模式,则通过HTTP请求copy数据,ReduceTask从各个MapTask上远程拷贝属于自己的那一片数据,放到内存里,如果超过阀值,写入磁盘。
- Merge & Sort:在远程copy的过程中,需要将多个属于自己的那一片合并成一块数据,并排序;因为每片已经是排序的,所以只需要一次归并排序即可。
- Reduce:由自定义的Reduce类,将数据写入Hdfs。
四.自己画了其中几个步骤
- MapTask Spill
- MapTask Merge & Combine
- ReduceTask Copy Merge & Sort
五.对spill的进一步理解
test-data.txt是145M,被分成两个InputSplit(仅仅是逻辑划分)。
MapTask1处理hdfs://192.168.1.200:9000/user/root/input/test-data.txt:0+134217728,MapTask2处理hdfs://192.168.1.200:9000/user/root/input/test-data.txt:134217728+18093772。
按理80M就开始spill的话,MapTask最多只有两个spill啊,为什么现在出现了四个(三个如上图,还有一个是在RecordWriter.close(mapperContext)时将内存flush到硬盘)。因为kvbuffer的结构并不只是存放了key/value,kvbuffer中一部分是存储诸如partitioner这样的kvmeta。
kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
对于kvbuffer的结构,我就不深究了。可以参考http://caibinbupt.iteye.com/blog/401374的几篇博客,虽然他读的不是Hadoop2.2.0。
另外也可以参考这个图,来自http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882268.html
以上就是我对MapTask和ReduceTask的一些肤浅认识。我写的每一篇文章都希望在保证正确的基础上更加通俗易懂。如有错误之处措辞不当等,欢迎指出。
一. 上文 二. MapTask 三. ReduceTask 四.自己画了其中几个步骤 五.对spill的进一步理解