在这篇http://bit1129.iteye.com/blog/2186325博文中,分析了hash based shuffle write开启consolidationFiles选项的过程。本文,则关注将Iteratable
1. 如下代码是HashShuffleWriter.write方法
在将partition的数据写入到磁盘前,进行map端的shuffle
/** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { ///对输入的partition对应Iteratable集合进行map端combine val iter = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { //如果定义了dep.aggregator以及dep.mapSideCombine则进行map端combine dep.aggregator.get.combineValuesByKey(records, context) } else { records } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") records } for (elem <- iter) { val bucketId = dep.partitioner.getPartition(elem._1) shuffle.writers(bucketId).write(elem) } }
2. 调用dep.aggregator.get.combineValuesByKey(records, context)进行map端combine
其中aggregator是Aggregator类型的对象,它在构造时需要传入如下参数:
case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { //代码体..... }
3. Aggregator.combineValuesByKey方法体:
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = combineValuesByKey(iter, null) //iter:是map端输入的可遍历的数据集合 def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { if (!isSpillEnabled) { //不启用spill到磁盘,那么数据集合中的所有数据将在内存中进行combine val combiners = new AppendOnlyMap[K,C] //combiners是一个AppendOnlyMap, 可以想象成内存内的HashMap var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (iter.hasNext) { //遍历集合 kv = iter.next() //kv._1是Key,changeValue接收两个参数,kv键以及update combiners.changeValue(kv._1, update) } combiners.iterator //返回 } else { //构造参数哪来的? val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) combiners.insertAll(iter) // Update task metrics if context is not null // TODO: Make context non optional in a future release Option(context).foreach { c => c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled } combiners.iterator } }
4. 只在内存内combine(AppendOnlyMap)
4.1 使用AppendOnlyMap,代码的关键是update函数,以及combiner.changeValue(kv._1, update)
val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null ///通过闭包特性,update函数内可以访问kv val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (iter.hasNext) { kv = iter.next() //kv键值对的key以及update函数作为入参 combiners.changeValue(kv._1, update) } combiners.iterator
4.2 AppendOnlyMap的changeValue方法
/** * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value * for key, if any, or null otherwise. Returns the newly updated value. */ //key:kv键值对的key, //updateFunc的函数类型是(Boolean,V)=>V def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { ///如果键值null,对键为空的处理 if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) // haveNullValue = true return nullValue } //通过对k进行rehash算出它Hash值 var pos = rehash(k.hashCode) & mask var i = 1 while (true) { //data是数组 //kv期望放到2*pos位置,其中k放到2*pos的位置,v放到2*pos+1的位置 val curKey = data(2 * pos) //if-else if-else做寻址探测 //如果data(2*pos)位置上的key已经存在,且与k相同,那么表示它们需要聚合 if (k.eq(curKey) || k.equals(curKey)) { //调用updateFunc,入参是值已经存在(true),那个位置上的值(old value) val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) //赋心智 data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //返回新值 return newValue } else if (curKey.eq(null)) { //HashMap上data(2 * pos)的值为null, //将kv的v写入,updateFunc的入参是值不存在,同时旧值为null val newValue = updateFunc(false, null.asInstanceOf[V]) //将data(2*pos)的值由null改为k data(2 * pos) = k //设置新值 data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //AppendOnlyMap的长度增1,表示新元素加入 incrementSize() return newValue } else { //重新寻址,寻址的算法,每次的步长是平方探测,1,2,4,8? val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
4.3 incrementSize肩负着resize table的职责
/** Increase table size by 1, rehashing if necessary */ private def incrementSize() { curSize += 1 if (curSize > growThreshold) { growTable() } }
5. 内存+磁盘combine(ExternalAppendOnlyMap)
特点:
1. 内存放不下可以放到磁盘
2. 放到磁盘前,首先进行排序
3. 最后对所有spill到磁盘的文件做归并排序
6. 向MapOutputTrackerMaster汇报写入数据的位置以及文件中FileSegment的位置
http://www.cnblogs.com/fxjwind/p/3522219.html