背景
在做spark重构代码的时候,账号的二期的有些计算,使用了sparkSQL,这个时候,发现要进行多次的left join,考虑效率问题,提出重分区:
val price =
s"""
|select
|identify_id,weibo_type,price_info
|from dm_account.hogwarts_account
|where identify_id is not null AND weibo_type is not null AND price_info is not null
|""".stripMargin
val priceTable = "industry_" + seqNum
val priceDF = sparkSession.sql(price)
.withColumn("platform_type", getIdOrPlatform(col("weibo_type"), lit("platform")))
.withColumn("price", getPriceFromPriceInfo(col("price_info")).cast(DoubleType))
.filter(s"price is not null AND price >0 AND platform_type is not null AND ${platformFilter}")
.select("identify_id", "platform_type", "price").toDF()
.repartition(20) //2.3.x有bug
.createTempView(priceTable)
注意使用的repartition函数,看重载,应该有如下方式:
def repartition(numPartitions: Int, partitionExprs: Column): Dataset[T]*
但是死活运行不了,报错:At least one partition-by expression must be specified
看源码:
2.3.x
/**
* Returns a new Dataset partitioned by the given partitioning expressions into
* `numPartitions`. The resulting Dataset is range partitioned.
*
* At least one partition-by expression must be specified.
* When no explicit sort order is specified, "ascending nulls first" is assumed.
* Note, the rows are not sorted in each partition of the resulting Dataset.
*
* @group typedrel
* @since 2.3.0
*/
@scala.annotation.varargs
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.")
val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match {
case expr: SortOrder => expr
case expr: Expression => SortOrder(expr, Ascending)
})
withTypedPlan {
RepartitionByExpression(sortOrder, planWithBarrier, numPartitions)
}
}
原来是2.3.x引入了新的底层实现,把pom改成2。2.2解决问题。