spark2.2
phoenix 5.8.0
spark中执行phoenix代码如下:
public void save2Table(SparkSession sparkSession, Dataset<Row> df, String tableName) {
df
.write()
.format("org.apache.phoenix.spark")
.mode(SaveMode.Overwrite)
.option("table", tableName)
.option("zkUrl", phoenixConfig.getUrl())
.save();
}
- 先成save()方法开始入手,进入save方法,执行了DataFrameWriter.scala的runCommand方法,其中参数为SaveIntoDataSourceCommand类的构造()方法:
runCommand(df.sparkSession, "save") {
SaveIntoDataSourceCommand(
query = df.logicalPlan,
provider = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap,
mode = mode)
- 而DataFrameWriter类的runCommand方法,执行了executePlan(command)方法,
private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
qe.toRdd
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
case e: Exception =>
session.listenerManager.onFailure(name, qe, e)
throw e
}
}
- 继续执行SessionState的executePlan()
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan
- QueryExecution类 执行如下代码:
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
- SparkSplan类的execute()
final def execute(): RDD[InternalRow] = executeQuery {
doExecute()
}
- SparkSplan类的executeQuery()
protected final def executeQuery[T](query: => T): T = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
waitForSubqueries()
query
}
}
- ExecutedCommandExec类的doExecute()
protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
- sideEffectResult的cmd.run()
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
- SaveIntoDataSourceCommand类的run()方法
override def run(sparkSession: SparkSession): Seq[Row] = {
DataSource(
sparkSession,
className = provider,
partitionColumns = partitionColumns,
options = options).write(mode, Dataset.ofRows(sparkSession, query))
Seq.empty[Row]
}
- DataSource类的write()方法:
def write(mode: SaveMode, data: DataFrame): Unit = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
- Datasource的createRelation()
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
- 终于到了spark-phoenix的类出场了。到这里就开始执行spark-phoenix的DefaultSource的createRelation()
override def createRelation(sqlContext: SQLContext, mode: SaveMode,
parameters: Map[String, String], data: DataFrame): BaseRelation = {
if (!mode.equals(SaveMode.Overwrite)) {
throw new Exception("SaveMode other than SaveMode.OverWrite is not supported")
}
verifyParameters(parameters)
// Save the DataFrame to Phoenix
data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"))
// Return a relation of the saved data
createRelation(sqlContext, parameters)
}
- DataFrameFunctions类的。saveToPhoenix方法。