1. N个partition,会产生N个MapTask,如果不指定ReduceTask的个数,那么默认情况下,ReduceTask个数也为N
2. N个partition,即N个MapTask,同时有N个ReduceTask(默认是N,不指定ReduceTask个数的情况下),那么会产生不大于N的.data文件以及不大于N的index文件。
即文件个数不受ReduceTask个数的影响。
如下所示:index个数是5,data个数是4,这些数字跟Reducer个数无关,而与Mapper个数有关。那么为什么data文件个数不是5个,这些文件是Mapper如何写入的?其中包含了什么内容,接下来分析。
hadoop$ pwd /tmp/spark-local-20150129214306-5f46 hadoop$ cd spark-local-20150129214306-5f46/ hadoop$ ls 0c 0d 0e 0f 11 13 15 29 30 32 36 38 hadoop$ find . -name "*.index" | xargs ls -l -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./0c/shuffle_0_4_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./0d/shuffle_0_3_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./0f/shuffle_0_1_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./30/shuffle_0_0_0.index -rw-rw-r-- 1 hadoop hadoop 48 1月 29 21:43 ./32/shuffle_0_2_0.index hadoop$ find . -name "*.data" | xargs ls -l -rw-rw-r-- 1 hadoop hadoop 884 1月 29 21:43 ./0c/shuffle_0_0_0.data -rw-rw-r-- 1 hadoop hadoop 685 1月 29 21:43 ./15/shuffle_0_1_0.data -rw-rw-r-- 1 hadoop hadoop 710 1月 29 21:43 ./29/shuffle_0_3_0.data -rw-rw-r-- 1 hadoop hadoop 693 1月 29 21:43 ./36/shuffle_0_2_0.data
ShuffleMapTask执行逻辑
既然N个partition写入到若干个data文件中,那么当Reducer过来拉取数据时,必须得知道,属于它的那部分数据
data和index文件名中的reduceId都是0,原因是(见IndexShuffleBlockManager)
def getDataFile(shuffleId: Int, mapId: Int): File = { ///相同的shuffleId和mapId指向同一个文件 blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0)) } private def getIndexFile(shuffleId: Int, mapId: Int): File = { ///相同的shuffleId和mapId指向同一个文件 blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0)) }
def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) if (bypassMergeSort && partitionWriters != null) { // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null try { out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) in.close() in = null lengths(i) = size } } finally { if (out != null) { out.close() } if (in != null) { in.close() } } } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. ///this.partitionedIterator包含了数据集合 for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { //如果elements有元素 val writer = blockManager.getDiskWriter( blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { ///遍历集合,写入文件 writer.write(elem) ///追加数据,遍历结束后再关闭 } writer.commitAndClose() ///关闭文件 val segment = writer.fileSegment() ///这是什么东东? lengths(id) = segment.length } } } context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled context.taskMetrics.diskBytesSpilled += diskBytesSpilled context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => if (curWriteMetrics != null) { m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime } } lengths }