入口方法 SQLContext.sql
def sql(sqlText: String): DataFrame = {
if (conf.dialect == "sql") {
/**
* 调用sqlParser组件针对sql语句生成一个Unresolved logicalPlan
* 将Unresolved LogicalPlan 和sqlContext自身的实例封装为一个dataframe
* 返回给用户,这里仅仅封装了sql语句的Unresolved LogicalPlan
* 用户调用show select groupby 等操作才会实际触发spark sql后续的sql执行流程
* 包括Analyzer optimizer sparkPlan execute PhysicalPlan
*
* TODO parseSql 解析sql ,获取Unresolved LogicPlan
*/
DataFrame(this, parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
}
}
=> parseSql
protected[sql] def parseSql(sql: String): LogicalPlan = {
// 这里实际上会调用sqlparser的apply方法,来获取一个对sql语句解析的logicPlan
// TODO sqlParser.apply -> AbstractSparkSQLParser.apply
ddlParser(sql, false).getOrElse(sqlParser(sql))
}
==> sqlParser
AbstractSparkSQLParser.apply
/**
* 调用sqlparser的apply方法,将sql语句解析成logicPlan时会调用sqlparser的父类
* AbstractSparkSQLParser 的apply方法中
*/
def apply(input: String): LogicalPlan = {
// Initialize the Keywords.
lexical.initialize(reservedWords)
/**
* 使用 lexical.Scanner 对sql语句进行语法检擦、分析、满足语法检查结果的话
* 就使用sql解析器,针对sql进行解析,包括词法解析(将sql语句解析成一个个的短语、token)
* 、词法解析,最后生成一个Unresolved LogicPlan,该LogicPlan仅仅针对sql语法本身,纯语法
* 不涉及任何关联的数据源等信息
*/
phrase(start)(new lexical.Scanner(input)) match {
case Success(plan, _) => plan
case failureOrError => sys.error(failureOrError.toString)
}
}
执行sql
SQLContext.executeSql
/**
* 真正执行sql语句的方法 ,返回QueryExecution ,实际触发整个后续的流程
* TODO executePlan
*/
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
protected[sql] class QueryExecution(val logical: LogicalPlan) {
def assertAnalyzed(): Unit = checkAnalysis(analyzed)
// TODO RuleExecutor.apply 解析逻辑计划
lazy val analyzed: LogicalPlan = analyzer(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed
cacheManager.useCachedData(analyzed)
}
/**
* optimizer 优化sql
* 调用optimizer.apply方法针对 Resolved LogicPlan调用Optimizer进行优化
* 获得Optimized LogicalPlan ,获取优化后的逻辑执行计划
*
* TODO SqlContext.optimizer -> DefaultOptimizer
*
*/
lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
// TODO: Don't just pick the first one...
/**
* 用Optimizer针对Resolved LogicPlan 生成的Optimized LogicPlan
* 用sparkPlaner 创建一个sparkPlan
*/
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(self)
// TODO planner
planner(optimizedPlan).next()
}
/**
* executedPlan should not be used to initialize any SparkPlan. It should be
* only used for execution.
*
* 生成物理执行计划(PhysicalPlan)
* 此时已经绑定到了物理的数据源,而且知道各个表的join,
* 如果进行join,包括join的时候,默认spark内部是会对小表进行广播的
*/
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
// 执行物理计划,返回Rdd[Row] 元素类型为Row的Rdd
lazy val toRdd: RDD[Row] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch {
case e: Throwable => e.toString }
def simpleString: String =
s"""== Physical Plan ==
|${
stringOrError(executedPlan)}
""".stripMargin.trim
override def toString: String =
// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${
stringOrError(logical)}
|== Analyzed Logical Plan ==
|${
stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${
stringOrError(optimizedPlan)}
|== Physical Plan ==
|${
stringOrError(executedPlan)}
|Code Generation: ${
stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
==> optimizer 优化器 SqlContext.optimizer -> DefaultOptimizer
object DefaultOptimizer extends Optimizer {
/**
* 优化策略 ,提升性能
*/
val batches =
Batch("Combine Limits", FixedPoint(100),
/**
* 多个limit子句 ,合并取一个并集 -> 写sql的时候尽量写一个limit
*/
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
/**
* 针对null的优化,尽量避免出现NULL的情况,否则NULL是很容易出现数据倾斜的
*/
NullPropagation,
ConstantFolding,
LikeSimplification,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions,
OptimizeIn) ::
Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("Filter Pushdown", FixedPoint(100),
/**
* 经union下推,将union、where这种子句下推到子查询中进行,尽量早的执行union和where操作
* 避免在外层查询中,针对大量的数据,两张大表执行where操作
*/
UnionPushdown,
/**
* 合并filter,就是合并where子句,比如子查询中有针对某个字段的where子句
* 外层查询中也有针对同样的一个字段where子句,那么合并where子句,取并集
*/
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
/**
* 列剪裁
*/
ColumnPruning) ::
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) :: Nil
}