Spark Join处理流程分析

为了更好的分析Spark Join处理流程,我们选择具有Shuffle操作的示例来进行说明,这比没有Shuffle操作的处理流程要复杂一些。本文主要通过实现一个Join操作的Spark程序,提交运行该程序,并通过Spark UI上的各种运行信息来讨论Spark Join处理流程。

Spark Join示例程序

我们先给出一个简单的Spark Application程序代码,这里处理的数据使用了MovieLens数据集。其中,小表movies(约1.4m)、大表genome-scores(约323.5m),对这两个表进行Join操作。具体的实现代码,如下所示:

def main(args: Array[String]): Unit = {
  val sc = new SparkContext()
  // movieId,title,genres
  val movieRdd = sc.textFile("/data/ml-20m/movies.csv")
        .filter(line => !line.startsWith("movieId"))
        .map(line =>  line.split(","))
        .map(a => (a(0), a(1)))
  // movieId,tagId,relevance
  val scoreRdd = sc.textFile("/data/ml-20m/genome-scores.csv")
        .filter(line => !line.startsWith("movieId"))
        .map(line =>  line.split(","))
        .map(a => (a(0), (a(1), a(2))))
  // movieId  title  tagId  relevance
  val finalRdd = movieRdd.join(scoreRdd)
        .map(r => Seq(r._1, r._2._1, r._2._2._1, r._2._2._2)
        .mkString("\t"))
  finalRdd.toDebugString
  finalRdd.saveAsTextFile("/temp/join")
}

上面代码,我们直接将两个表进行Join操作(实际更优的做法是:将小表进行Broadcast后,再进行连接操作,能够避免昂贵低效的Shuffle操作)。我们这么做,主要是为了使程序处理过程中能够进行Shuffle操作,从而更深入地理解Join操作的内部处理流程。
我们通过如下命令,查看一下输入数据表genome-scores表在HDFS上存储的Block情况,执行如下命令:

hdfs fsck /data/ml-20m/movies.csv -files -racks -locations -blocks
hdfs fsck /data/ml-20m/genome-scores.csv -files -racks -locations -blocks

可以看到输入数据集的存储情况,如下所示:

/data/ml-20m/movies.csv 1397542 bytes, 1 block(s):  OK
0. BP-893796349-172.16.117.62-1504161181581:blk_1074121675_380924 len=1397542 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.63:50010, /default/172.16.117.65:50010]
 
/data/ml-20m/genome-scores.csv 323544381 bytes, 3 block(s):  OK
0. BP-893796349-172.16.117.62-1504161181581:blk_1074121670_380919 len=134217728 Live_repl=3 [/default/172.16.117.63:50010, /default/172.16.117.65:50010, /default/172.16.117.64:50010]
1. BP-893796349-172.16.117.62-1504161181581:blk_1074121671_380920 len=134217728 Live_repl=3 [/default/172.16.117.65:50010, /default/172.16.117.64:50010, /default/172.16.117.63:50010]
2. BP-893796349-172.16.117.62-1504161181581:blk_1074121672_380921 len=55108925 Live_repl=3 [/default/172.16.117.64:50010, /default/172.16.117.65:50010, /default/172.16.117.63:50010]

通过RDD的toDebugString()方法,打印调试信息:

res1: String = 
(3) MapPartitionsRDD[40] at map at <console>:38 []
 |  MapPartitionsRDD[39] at join at <console>:38 []
 |  MapPartitionsRDD[38] at join at <console>:38 []
 |  CoGroupedRDD[37] at join at <console>:38 []
 +-(2) MapPartitionsRDD[31] at map at <console>:34 []
 |  |  MapPartitionsRDD[30] at map at <console>:34 []
 |  |  MapPartitionsRDD[29] at filter at <console>:34 []
 |  |  /data/ml-20m/movies.csv MapPartitionsRDD[28] at textFile at <console>:34 []
 |  |  /data/ml-20m/movies.csv HadoopRDD[14] at textFile at <console>:34 [] 
 +-(3) MapPartitionsRDD[27] at map at <console>:36 [] 
 |  |  MapPartitionsRDD[26] at map at <console>:36 []
 |  |  MapPartitionsRDD[25] at filter at <console>:36 []
 |  |  /data/ml-20m/genome-scores.csv MapPartitionsRDD[24] at textFile at <console>:36 []
 |  |  /data/ml-20m/genome-scores.csv HadoopRDD[13] at textFile at <console>:36 [] 

上面信息可以非常清晰地看到,我们的Spark程序都创建了哪些RDD,以及这些RDD之间的大致顺序关系。

分析Spark Join处理流程

Spark程序运行过程中,我们可以通过Web UI看到对应的DAG生成的Stage的情况,一共生成了3个Stage。为了更加清晰地了解各个Stage(对应的各组Task)都被调度到了哪个Executor上运行,我们通过Spark UI看一下对于该Spark程序创建Executor的具体情况,如下图所示:
在这里插入图片描述
通过上图可见,为了运行我们提交的Spark程序,在Spark集群中的3个节点上,总计启动了3个Executor和1个Driver。其中,图中ID为3和driver的Executor是在同一个物理节点上。我们知道,在同一个Spark程序运行过程中,可能会复用已经启动的Executor,这里的3个Executor在程序运行过程中都会被复用,可以在后面各个Stage运行过程中看到。
下面,分别对各个Stage的运行情况进行分析:

  • Stage 0运行过程

对数据表/data/ml-20m/movies.csv,进行filter->map->map操作,对应于Stage 0,如下图所示:
在这里插入图片描述
经过前面我们通过toDebugString()打印出DAG关系图,可以知道/data/ml-20m/movies.csv具有两个Partition,所以对应的Stage 0包含2个可调度运行的ShuffleMapTask。通过Spark Web UI可以看到,如下图所示:
在这里插入图片描述
上图中,Locality Level为NODE_LOCAL,在2个节点上各启动了1个Executor,Stage 0包含2个ShuffleMapTask。

  • Stage 1运行过程

同样,对数据表/data/ml-20m/genome-scores.csv也进行filter->map->map操作,对应于Stage 1,如下图所示:
在这里插入图片描述
/data/ml-20m/genome-scores.csv具有3个Partition,所以同样对于Stage 1具有3个可调度运行的ShuffleMapTask,如下图所示:
在这里插入图片描述
可见,在3个节点上各启动了1个Executor,用来运行Stage 1的ShuffleMapTask。其中,Locality Level也为NODE_LOCAL。

  • Stage 2运行过程

该Stage对应的DAG图表达,如下图所示:
在这里插入图片描述
可见,Join操作实际包含了一组操作的集合:cogroup、map、map。这里,我们通过Spark源码来看一下,调用join操作,其内部具体都做了哪些事情,我们实现的Spark程序中调用了带有一个RDD参数的join方法,代码如下所示:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
  join(other, defaultPartitioner(self, other))
}

继续调用了另一个多了Partitioner参数的join方法,代码如下所示:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}

上面代码中,调用了最核心的cogroup方法,如下所示:

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  cg.mapValues { case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}

这里面创建了CoGroupedRDD,所以可以直接分析CoGroupedRDD的实现。根据前面我们实现的Spark程序,应该存在Shuffle过程,所以CoGroupedRDD与Stage 0和Stage 1中的两个RDD之间,必然应该存在ShuffledDependency依赖关系,通过CoGroupedRDD类的代码验证,如下所示:

override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_] =>
    if (rdd.partitioner == Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](
        rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
    }
  }
}

在我们示例程序实际运行过程中,执行了创建ShuffleDependency的分支,在这里一共生成了2个ShuffleDependency。现在,我们需要知道CoGroupedRDD是如何计算的,也就是需要知道在调用join操作过程中如何进行Shuffle操作的,查看CoGroupedRDD的compute方法,基于依赖的ShuffleDependency进行计算,计算过程代码如下所示:

val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
  case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
    val dependencyPartition = split.narrowDeps(depNum).get.split
    // Read them from the parent
    val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
    rddIterators += ((it, depNum))
 
  case shuffleDependency: ShuffleDependency[_, _, _] =>
    // Read map outputs of shuffle
    val it = SparkEnv.get.shuffleManager
      .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
      .read()
    rddIterators += ((it, depNum))
}

上述的compute方法实现了对CoGroupedRDD的某一个Partition的计算处理,实际会执行ShuffleDependency这个case分支,它会通过ShuffleManager来读取依赖的上游2个RDD的计算结果,从而得到CoGroupedRDD的某个Partition依赖于上游RDD的计算结果,通过(Iterator[Product2[K, Any]], Int)对的方式表达,表示计算CoGroupedRDD需要上游RDD结果迭代器(it)和对应的依赖编号(depNum)。
Stage 2可调度运行的是一组ResultTask,每个ResultTask处理最终结果的一个Partition,该Partition执行上述代码后,已经将Stage 1运行过程的计算结果(map输出)读取到对应的Reduce端,也就是说属于该Partition的所有map输出结果都已经读取过来了。而且,我们应该知道,具有相同的key的记录,一定会在Shuffle过程中读取到同一个Executor中(同一个ResultTask中),可以想到下面需要进行实际的Join操作流程了,具体代码如下所示:

val map = createExternalMap(numRdds)
for ((it, depNum) <- rddIterators) {
  map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
  map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])

首先创建一个ExternalAppendOnlyMap存储结构,然后遍历上面得到的rddIterators,将该Partition依赖的RDD结果记录insert到ExternalAppendOnlyMap中。ExternalAppendOnlyMap能够对具有相同key的一组记录执行merge操作,最终得到一个按相同key进行连接操作的结果。上面代码,通过创建的InterruptibleIterator就可以迭代出Join后的结果。当然,这里Join完的结果还不是最终我们需要的结果,通过在RDD的join和cogroup方法中还需要对这里得到的结果进行后续处理,才得到我们Spark程序最终需要的Join结果。
下面,看一下Stage 2对应的3个ResultTask的运行情况,如下图所示:
在这里插入图片描述
示例图中的Task还在运行中,运行过程中需要进行Shuffle读取操作。
Stage 2对应DAG图中,最后一步是saveAsTextFile,上面计算得到的结果就是Join操作的最终结果,结果由Spark程序运行过程中的各个BlockManager来管理,结果或者保存在内存中,或者存储在磁盘上,我们这个示例程序结果都存放在内存中,当调用saveAsTextFile方法时,会直接将Join结果写入到HDFS指定文件中。

总结

基于提供的Spark程序示例,我们从源码的角度,将对应的RDD及其作用于RDD之上的操作,以及对应的顺序关系,通过如下RDD DAG图来更加清晰地表现出来,如下图所示:
在这里插入图片描述
根据每个RDD对应的Partition情况,将上图细化到Partition级别,我们能够看出每个RDD的各个Partition之间是如何关联的,如下图所示:
在这里插入图片描述
在我们示例程序的Join过程中,一定存在Shuffle处理,最终处理流程可以在上图中各个RDD的各个Partition之间表现出来。扩展一点思考,假如没有Join过程没有发生Shuffle处理,那么在cogroup处理中就不会创建ShuffleDependency,而是会创建对应的OneToOneDependency。那么上图中,处理完成Stage 0、Stage 1后得到的RDD的各个Partition,已经使用相同的Partitioner基于记录的Key对数据进行了Partition操作。所以,在生成Stage 2中的RDD的某个Partition时,与上游Stage 0、Stage 1中每个RDD中某个(而不是某些,不是与RDD中各个Partition都有关系)Partition是一对一的关系,这一计算起来就简单多了。

转载自
http://shiyanjun.cn/archives/1816.html

猜你喜欢

转载自blog.csdn.net/bingdianone/article/details/84965418