RDD和DataFrame
- SparkContext 通过 textFile API 把源数据转换为 RDD
- SparkSession,你可以把它理解为是 SparkContext 的进阶版,是 Spark(2.0 版本以后)新一代的开发入口。SparkSession,你可以把它理解为是 SparkContext 的进阶版,是 Spark(2.0 版本以后)新一代的开发入口
- RDD 的计算引擎是 Spark Core.
- DataFrame 背后的计算引擎是 Spark SQL.
- RDD 是弹性分布式数据集,数据集的概念比较强一点;RDD 容器可以装任意类型的可序列化元素(支持泛型)。
- DataFrame 也是弹性分布式数据集,但是本质上是一个分布式数据表,因此称为分布式表更准确。DataFrame 每个元素不是泛型对象,而是 Row 对象。
- RDD 是不带 Schema 的分布式数据集。无从知道每个元素的【内部字段】信息。意思是下图不知道 Person 对象的姓名、年龄等。如下图:
- DataFrame 就是携带数据模式(Data Schema)的结构化分布式数据集。DataFrame 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。
DataFrame API 最大的意义在于,它为 Spark 引擎的内核优化打开了全新的空间。
首先,DataFrame 中 Schema 所携带的类型信息,让 Spark 可以根据明确的字段类型设计定制化的数据结构,从而大幅提升数据的存储和访问效率。其次,DataFrame 中标量算子确定的计算逻辑,让 Spark 可以基于启发式的规则和策略,甚至是动态的运行时信息去优化 DataFrame 的计算过程。
Spark Sql优化
spark core和spark sql 的关系
如下图所示为spark core和spark sql的关系图
spark Core特指底层执行引擎,它包括了调度系统,存储系统,内存管理,shuffle管理等核心功能模块。而spark是凌驾于spark core之上,是一层独立的优化引擎。也就是说spark core负责执行,而spark sql负责优化,spark sql优化过后的代码,依然要交付Spark core来做执行。
在开发入口来说,在 RDD 框架下开发的应用程序,会直接交付 Spark Core 运行。使用 DataFrame API 开发的应用,则会先过一遍 Spark SQL,由 Spark SQL 优化过后再交由 Spark Core 去做执行。
基于Dataframe,spark sql 如何优化(Catalyst 优化器和 Tungsten)
Catalyst 优化器的职责在于创建并优化执行计划,其包含3个功能模块,分别是创建语法树并生成执行计划、逻辑阶段优化和物理阶段优化。Tungsten 用于衔接 Catalyst 执行计划与底层的 Spark Core 执行引擎,它主要负责优化数据结果与可执行代码。如图所示
结合如下代码说明其优化过程,
import org.apache.spark.sql.DataFrame
val rootPath: String = _
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
// 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
// 摇号数据与中签数据做内关联,Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")
// 以batchNum、carNum做分组,统计倍率系数
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum"))
.agg(count(lit(1)).alias("multiplier"))
// 以carNum做分组,保留最大的倍率系数
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
.agg(max("multiplier").alias("multiplier"))
// 以multiplier倍率做分组,统计人数
val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
.agg(count(lit(1)).alias("cnt"))
.orderBy("multiplier")
result.collect
复制代码
Catalyst
先说Catalyst 的优化过程。基于代码中的DataFrame之间确切的转换逻辑,Catalyst会先使用第三方的Sql解析器ANTLR生成抽象语法树(AST,Abstract Syntax Tree)。AST 由节点和边这两个基本元素构成,其中节点就是各式各样的操作算子,如 select、filter、agg 等,而边则记录了数据表的 Schema 信息,如字段名、字段类型,等等。
下图为上述代码的语法树,它实际上描述了从源数据到最终计算结果之间的转换过程。因此,在 Spark SQL 的范畴内,AST 语法树又叫作“执行计划”(Execution Plan)。
从上图中可以看到,由算子构成的语法树、或者说执行计划,给出了明确的执行步骤。即使不经过任何优化,Spark Core也能把这个“原始的”执行计划按部就班的运行起来。
不过,从执行效率的角度出发,这么做并不是最优的选择。以上图中的绿色节点来看,Scan 用于全量扫描并读取中签者数据,Filter 则用来过滤出摇号批次大于等于“201601”的数据,Select 节点的作用则是抽取数据中的“carNum”字段。
那么利用谓词下推”(Predicates Pushdown)和“列剪枝”(Columns Pruning)这两项特性进行优化。谓词下推指的是,利用像“batchNum >= 201601”这样的过滤条件,在扫描文件的过程中,只读取那些满足条件的数据文件。如果是列存储的文件格式(如Parquet),那么可以“剪掉”不需要的部分,读取只关心的部分“carNum”。 如下图谓词下推和列剪枝
上述的代码经过谓词下推和列剪枝,Spark Core 只需要扫描图中绿色的文件部分。显然,这两项优化,都可以有效帮助 Spark Core 大幅削减数据扫描量、降低磁盘 I/O 消耗,从而显著提升数据的读取效率。
因此,经过优化的执行顺序从“Scan > Filter > Select”调整为“Filter > Select > Scan”,提高Spark Core 的执行性能。
像谓词下推、列剪枝这样的特性,都被称为启发式的规则或策略。而 Catalyst 优化器的核心职责之一,就是在逻辑优化阶段,基于启发式的规则和策略调整、优化执行计划,为物理优化阶段提升性能奠定基础。经过逻辑阶段的优化之后,原始的执行计划调整为下图所示的样子,请注意绿色节点的顺序变化。
经过逻辑阶段优化的执行计划,依然可以直接交付 Spark Core 去运行,但是还可以进一步优化,除了逻辑阶段的优化,Catalyst 在物理优化阶段还会进一步优化执行计划。与逻辑阶段主要依赖先验的启发式经验不同,物理阶段的优化,主要依赖各式各样的统计信息,如数据表尺寸、是否启用数据缓存、Shuffle 中间文件,等等。换句话说,逻辑优化更多的是一种“经验主义”,而物理优化则是“用数据说话”。
还是以上述代码为例,如上图的join节点为例,执行计划仅交代了 applyNumbersDF 与 filteredLuckyDogs 这两张数据表需要做内关联,但是,它并没有交代清楚这两张表具体采用哪种机制来做关联。按照实现机制来分类,数据关联有 3 种实现方式,分别是嵌套循环连接(NLJ,Nested Loop Join)、排序归并连接(Sort Merge Join)和哈希连接(Hash Join)。而按照数据分发方式来分类,数据关联又可以分为 Shuffle Join 和 Broadcast Join 这两大类。因此,在分布式计算环境中,至少有 6 种 Join 策略供 Spark SQL 来选择。
在物理优化阶段,Catalyst 优化器需要结合 applyNumbersDF 与 filteredLuckyDogs 这两张表的存储大小,来决定是采用运行稳定但性能略差的 Shuffle Sort Merge Join,还是采用执行性能更佳的 Broadcast Hash Join。
不论Catalyst决定采用哪种 Join 策略,优化过后的执行计划,都可以丢给 Spark Core 去做执行。当Catalyst 优化器完成其任务后,Tungsten在 Catalyst 输出的执行计划之上,继续打磨、精益求精,力求把最优的执行代码交付给底层的 SparkCore 执行引擎。
Tungsten
在Catalyst的基础上,Tungsten 主要是在数据结构和执行代码这两个方面,做进一步的优化。数据结构优化指的是 Unsafe Row 的设计与实现,执行代码优化则指的是全阶段代码生成(WSCG,Whole Stage Code Generation)。
- 为什么要有Unsafe Row。
对于 DataFrame 中的每一条数据记录,Spark SQL 默认采用 org.apache.spark.sql.Row 对象来进行封装和存储。如果使用 Java Object 来存储数据会引入大量额外的存储开销。
所以Tungsten 设计并实现了一种叫做 Unsafe Row 的二进制数据结构。**Unsafe Row 本质上是字节数组,它以极其紧凑的格式来存储 DataFrame 的每一条数据记录,大幅削减存储开销,从而提升数据的存储与访问效率。 **
- 什么是WSCG:全阶段代码生成。
所谓全阶段,其实就是我们在调度系统中学过的 Stage。以图中的执行计划为例,标记为绿色的 3 个节点,在任务调度的时候,会被划分到同一个 Stage。
而代码生成,指的是 Tungsten 在运行时把算子之间的“链式调用”捏合为一份代码。以上图 3 个绿色的节点为例,在默认情况下,Spark Core 会对每一条数据记录都依次执行 Filter、Select 和 Scan 这 3 个操作。
经过了 Tungsten 的 WSCG 优化之后,Filter、Select 和 Scan 这 3 个算子,会被“捏合”为一个函数 f。这样一来,Spark Core 只需要使用函数 f 来一次性地处理每一条数据,就能消除不同算子之间数据通信的开销,一气呵成地完成计算。
至此,分别完成 Catalyst 和 Tungsten 这两个优化环节之后,Spark SQL 把优化过的执行计划、以及生成的执行代码,交付给老大哥Spark Core。Spark Core 拿到计划和代码,在运行时利用 Tungsten Unsafe Row 的数据结构,完成分布式任务计算。