一、从SQL到RDD
1. 一个简单的例子
样例数据 test.json
{"name":"上海滩","singer":"叶丽仪","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"},
{"name":"一生何求","singer":"陈百强","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"},
{"name":"红日","singer":"李克勤","album":"怀旧专辑","path":"mp3/shanghaitan.mp3"},
{"name":"爱如潮水","singer":"张信哲","album":"怀旧专辑","path":"mp3/airucaoshun.mp3"},
{"name":"红茶馆","singer":"陈惠嫻","album":"怀旧专辑","path":"mp3/redteabar.mp3"}
SparkSQL读进来,得到DataFrame
scala> spark.read.json("test.json")
res1: org.apache.spark.sql.DataFrame = [album: string, name: string ... 2 more fields]
scala> res1.show(false)
+--------+----+-------------------+------+
|album |name|path |singer|
+--------+----+-------------------+------+
|香港电视剧主题歌|上海滩 |mp3/shanghaitan.mp3|叶丽仪 |
|香港电视剧主题歌|一生何求|mp3/shanghaitan.mp3|陈百强 |
|怀旧专辑 |红日 |mp3/shanghaitan.mp3|李克勤 |
|怀旧专辑 |爱如潮水|mp3/airucaoshun.mp3|张信哲 |
|怀旧专辑 |红茶馆 |mp3/redteabar.mp3 |陈惠嫻 |
+--------+----+-------------------+------+
执行个简单的查询
scala> res1.createOrReplaceTempView("test")
scala> spark.sql("SELECT * FROM test WHERE album='香港电视剧主题歌'").show()
+--------+----+-------------------+------+
| album|name| path|singer|
+--------+----+-------------------+------+
|香港电视剧主题歌| 上海滩|mp3/shanghaitan.mp3| 叶丽仪|
|香港电视剧主题歌|一生何求|mp3/shanghaitan.mp3| 陈百强|
+--------+----+-------------------+------+
2. 执行过程
SQL查询 -> [逻辑计划] -> [物理计划]
逻辑计划将SQL语句转换成逻辑算子树;逻辑计划只是中间阶段,不会直接提交执行;物理计划对逻辑算子树进行转换,生成物理算子树,物理算子树的节点会直接生成RDD或对RDD进行transformation操作;
进一步地,逻辑计划包括三个阶段:
- SQL查询 -> Unresolved 逻辑计划;
- Unresolved逻辑计划 -> Analyzed逻辑计划;
- Analyzed逻辑计划 -> Optimized逻辑计划
同样,物理计划也分为三个阶段:
- 逻辑算子树 -> Iterator[物理计划];
- Iterator[物理计划] -> SparkPlan;
- SparkPlan -> Prepared SparkPlan;
打开Spark Web UI, 在SQL标签页,可以查看SQL语句编译后的结果:
== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- AnalysisBarrier
+- Project [cast(album#163 as string) AS album#215, cast(name#164 as string) AS name#216, cast(path#165 as string) AS path#217, cast(singer#166 as string) AS singer#218]
+- Project [album#163, name#164, path#165, singer#166]
+- Filter (album#163 = 香港电视剧主题歌)
+- SubqueryAlias test
+- Relation[album#163,name#164,path#165,singer#166] json
== Analyzed Logical Plan ==
album: string, name: string, path: string, singer: string
GlobalLimit 21
+- LocalLimit 21
+- Project [cast(album#163 as string) AS album#215, cast(name#164 as string) AS name#216, cast(path#165 as string) AS path#217, cast(singer#166 as string) AS singer#218]
+- Project [album#163, name#164, path#165, singer#166]
+- Filter (album#163 = 香港电视剧主题歌)
+- SubqueryAlias test
+- Relation[album#163,name#164,path#165,singer#166] json
== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Filter (isnotnull(album#163) && (album#163 = 香港电视剧主题歌))
+- Relation[album#163,name#164,path#165,singer#166] json
== Physical Plan ==
CollectLimit 21
+- *(1) Project [album#163, name#164, path#165, singer#166]
+- *(1) Filter (isnotnull(album#163) && (album#163 = 香港电视剧主题歌))
+- *(1) FileScan json [album#163,name#164,path#165,singer#166] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/renq/Downloads/spark-2.3.2-bin-hadoop2.6/data/test.json], PartitionFilters: [], PushedFilters: [IsNotNull(album), EqualTo(album,香港电视剧主题歌)], ReadSchema: struct<album:string,name:string,path:string,singer:string>
上面就是一条简单的SQL语句转换成RDD的流程;其他更加复杂的SQL语句(例如Join,Aggregation)的转换过程有更多的细节,但总体来看仍然遵循上述步骤。
SQL语句的整个转换过程都在Spark集群的Driver端进行,不涉及分布式环境;SparkSQL内部实现上述转换,涉及的组件包括SparkSqlParser类、Analyzer类、Optimizer类、SparkPlanner类等,最后封装成一个QueryExecution对象。更多细节不在本文展开。
二、概念和数据结构
SparkSQL内部实现上述流程的基础框架叫做Catalyst。Catalyst涉及了几个基础性概念,包括:InternalRow体系、TreeNode体系和Expression体系。
1. InternalRow
InternalRow的含义很直观,表示一行数据。物理算子树节点产生和转换的RDD的类型就是RDD[InternalRow]。InternalRow是一个抽象类,包括numFields,update方法和各列数据的get/set方法。
具体的逻辑由InternalRow的不同子类实现,目前包括BaseGenericInternalRow、UnsafeRow和JoinedRow三个子类。
- BaseGenericInternalRow 同样是抽象类,实现了InternalRow定义的所有get方法,实现是通过调用类中定义的genericGet虚函数进行;
- JoinedRow 用于实现Join操作,将两个InternalRow放在一起,形成新的InternalRow。
- UnsafeRow 一种Spark自定义的对象存储格式,不采用Java对象存储,避免了JVM中的垃圾回收;UnsafeRow对行数据进行了编码,使得存储更加高效;
BaseGenericInternalRow的衍生出3个子类,GenericInternalRow、SpecificInternalRow和MutableUnsafeRow。而GernericInternalRow和SpecificInternalRow的区别在于构造参数的类型,其中GernericInternalRow的构造参数类型是Array[Any], 一旦创建就不允许通过set操作修改; 而SpecificInternalRow的构造参数类型是Array[MutableValue],允许通过set操作修改。
/**
* An internal row implementation that uses an array of objects as the underlying storage.
* Note that, while the array is not copied, and thus could technically be mutated after creation,
* this is not allowed.
*/
class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow {
/** No-arg constructor for serialization. */
protected def this() = this(null)
def this(size: Int) = this(new Array[Any](size))
override protected def genericGet(ordinal: Int) = values(ordinal)
override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values.clone()
override def numFields: Int = values.length
override def setNullAt(i: Int): Unit = { values(i) = null}
override def update(i: Int, value: Any): Unit = { values(i) = value }
}
/**
* A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
* based on the dataTypes of each column. The intent is to decrease garbage when modifying the
* values of primitive columns.
*/
final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
def this(dataTypes: Seq[DataType]) =
this(
dataTypes.map {
case BooleanType => new MutableBoolean
case ByteType => new MutableByte
case ShortType => new MutableShort
// We use INT for DATE internally
case IntegerType | DateType => new MutableInt
// We use Long for Timestamp internally
case LongType | TimestampType => new MutableLong
case FloatType => new MutableFloat
case DoubleType => new MutableDouble
case _ => new MutableAny
}.toArray)
def this() = this(Seq.empty)
def this(schema: StructType) = this(schema.fields.map(_.dataType))
override def numFields: Int = values.length
// 此处省略部分代码
}
MutableUnsafeRow后面详细介绍UnsafeRow时再讲。
2. TreeNode
从文章开头的例子中,无论是物理计划还是逻辑计划都会生成表达式树,在Catalyst中,对应的是TreeNode。TreeNode是Spark SQL中所有树结构的基类,定义了树遍历的接口和一系列通用的集合操作,例如:
- collectLeaves 获取当前TreeNode的所有叶子节点
- collectFirst 先序遍历所有节点并返回第一个满足条件的节点
- withNewChildren 将当前节点的子节点替换为新的子节点
- transformDown 用先序遍历顺序将规则作用于所有节点
- transformUp 用后序遍历方式将规则作用于所有节点
- transformChildren 递归将规则作用到所有子节点
特别值得注意的是,Catalyst提供了定位节点功能,根据TreeNode可以定位到SQL字符串的行数和起始位置,这个功能在SQL解析发生错误时可以帮助开发者快速找到出错的位置。
case class Origin(
line: Option[Int] = None,
startPosition: Option[Int] = None)
/**
* Provides a location for TreeNodes to ask about the context of their origin. For example, which
* line of code is currently being parsed.
*/
object CurrentOrigin {
private val value = new ThreadLocal[Origin]() {
override def initialValue: Origin = Origin()
}
def get: Origin = value.get()
def set(o: Origin): Unit = value.set(o)
def reset(): Unit = value.set(Origin())
def setPosition(line: Int, start: Int): Unit = {
value.set(
value.get.copy(line = Some(line), startPosition = Some(start)))
}
def withOrigin[A](o: Origin)(f: => A): A = {
set(o)
val ret = try f finally { reset() }
ret
}
}
3. Expression
TODO
三、参考资料
- SparkSQL内核剖析,朱锋 张韶全 黄明 著
- Spark源代码