版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Suubyy/article/details/82015365
- 前述
- 在上一章节Spark(十五)Executor执行Task的原理与源码分析我们解析了执行
Task
之前的一些准备工作,但是还没有正真进入Task
的执行程序。在上一章节的解析中,在最后Task
里的run
方法中调用了抽象方法runTask
方法,而我们说了,Task
是一个抽象类,而runTask
方法也是一个抽象方法,所以Task
执行的具体逻辑是有Task
的子类实现的,实现类Task
抽象类的子类有两种,一个是ShuffleMapTask
,一个是ResultTask
、所以下边我们将对这两个类进行详细的解析,理解Task
执行的具体细节。
- 在上一章节Spark(十五)Executor执行Task的原理与源码分析我们解析了执行
ShuffleMapTask
原理解析:这类的作用是将RDD
的每个Partition
拆分成多个buckets(存储桶或者存储区)
(基于ShuffleDependency
中指定的分区)ShuffleMapTask
的runTask
方法://该方法的作用执行Task,然后将结果返回给调度器 //其中MapStatus封装了块管理器的地址,以及每个reduce的输出大小 //以便于传递reduce的任务 override def runTask(context: TaskContext): MapStatus = { //记录反序列化RDD的开始时间 val deserializeStartTime = System.currentTimeMillis() //创建一个序列化器 val ser = SparkEnv.get.closureSerializer.newInstance() // 反序列化广播变量来的得到RDD val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { //获取ShuffleManager val manager = SparkEnv.get.shuffleManager //利用ShuffleManager获取ShuffleWriter,ShuffleWriter的功能就是将Task计算的结果 //持久化到shuffle文件中,作为子Stage的输入 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //通过ShuffleWriter将结果进行持久化到shuffle文件中,作为子Stage的输入 //rdd.iterator(partition, context)这个方法里就会执行我们自己编写的业务逻辑代码 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) //关闭writer,将元数据写入MapStatus中,然后返回 writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
RDD
的iterator
方法,final def iterator(split: Partition, context: TaskContext): Iterator[T] = { //判断存储级别,如果存储不为None,那么就会从缓存中取出Partition if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { //Partition没有缓存,就调用这个方法 computeOrReadCheckpoint(split, context) } }
RDD
的computeOrReadCheckpoint
方法,private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] ={ if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { //调用RDD的compute方法开始执行Task compute(split, context) } }
MapPartitionsRDD
的compute
方法override def compute(split: Partition, context: TaskContext): Iterator[U] = //该方法就是Spark执行了我们自己写的处理逻辑代码 //f:Spark封装了我们的逻辑代码 f(context, split.index, firstParent[T].iterator(split, context))
ResultTask runTask
原理解析ResultTask
的runTask
方法,因为ResultTask
执行的结果就是最终结果,要不返回,要不持久化,所以它的处理逻辑很简单。override def runTask(context: TaskContext): U = { //记录反序列化RDD的开始时间 val deserializeStartTime = System.currentTimeMillis() /创建一个序列化器 val ser = SparkEnv.get.closureSerializer.newInstance() // 反序列化广播变量来的得到RDD val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) //因为ResultTask执行后的结果不会给任何其他的Task用,所以就直接调用rdd的iterator方法 //执行我们自己定义的逻辑代码 func(context, rdd.iterator(partition, context)) }
RDD
的iterator
方法final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
RDD
的computeOrReadCheckpoint
方法,private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
ShuffleRDD
的compute
方法override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }