目录
【坑1:persist() + show()不能缓存全部数据】
【坑2:unpersist()一个rdd时会同时unpersist()子RDD】
【坑1:persist() + show()不能缓存全部数据】
对于一个有100个分区的数据,假如你对其persist()后进行show(10),那么spark只会把第一个分区缓存起来,show前10条数据。如果你show的数据量超过了一个分区的大小,那么spark会多缓存一些分区。
因此persist()后如果希望数据全部都缓存到内存中,应对每个分区都执行action操作,如进行count()。
例子:
package high_quality._history
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object demo {
def main(args: Array[String]) {
Logger.getRootLogger.setLevel(Level.ERROR)
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10000)
.toDF("value")
.repartition(100)
df.persist()
df.show(10)
Thread.sleep(10000) // 在此时查看Spark UI的storage页面
println(df.count())
Thread.sleep(10000) // 在此时再次查看Spark UI的storage页面
}
}
在第一次查看Storage时,可以见到这个DataFrame只有其中一个分区被缓存起来。而在第二次查看Storage时,由于执行了count()操作,100个分区全被缓存起来了。
【坑2:unpersist()一个rdd时会同时unpersist()子RDD】
在spark 2.4之前的版本,当你创建了一个DataFrame a,同时由a得到b,并且把2个DataFrame都缓存起来了,如果执行a.unpersist(),会把b也自动unpersist()。
这个潜在的坑会使得你的spark程序增加了大量的计算量,鄙人之前由于计算数据量大,把代码分成了八步,每一步结束后都对当前df进行了persist() + count(),然后对父rdd进行了unpersist,这样导致了第一步重复计算了8次、第二步重复计算了7次、...、第七步重复计算了2次,通过查看spark UI才发现了此问题。
例子如下:
package high_quality._history
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object demo {
def main(args: Array[String]) {
Logger.getRootLogger.setLevel(Level.ERROR)
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val a = Seq("1").toDF("x")
val b = a.withColumn("y", $"x" + 1)
a.persist()
println(a.count())
b.persist()
println(b.count())
Thread.sleep(10000) // 在此时查看Spark UI的Storage
a.unpersist()
Thread.sleep(10000) // 在此时再次查看Spark UI的Storage
}
}
第一次查看spark UI,两个df都成功缓存:
第二次查看spark UI,可见当a.unpersist()后,b也被移出Storage了
解决方法是不进行unpersist(),或者内存不足时通过checkpoint()等方式把数据保存到hadoop或外部文件中。