1 cache(), persist()和unpersist()
原文链接:Spark DataFrame Cache and Persist Explained
spark中DataFrame或Dataset里的cache()
方法默认存储等级为MEMORY_AND_DISK
,这跟RDD.cache()
的存储等级MEMORY_ONLY
是不一样的。理由是重新计算内存中的表的代价是昂贵的。MEMORY_AND_DISK表示如果内存中缓存不下,就存在磁盘上。
spark的dataset类中的cache()方法内部调用的是persist()方法。cache()在spark中是懒惰的方法,必须触发了action操作才会被执行。
val dfPersist = df.cache()#默认存储等级为MEMORY_AND_DISK
如果直接使用persist()方法,可以选择存储等级,存储等级有MEMORY_ONLY
,MEMORY_AND_DISK
, MEMORY_ONLY_SER
, MEMORY_AND_DISK_SER
, DISK_ONLY
, MEMORY_ONLY_2
,MEMORY_AND_DISK_2
等。
val dfPersist = df.persist()#默认存储等级为MEMORY_AND_DISK
dfPersist.show(false)
val dfPersist = df.persist(StorageLevel.MEMORY_ONLY)#设置缓存的存储等级为MEMORY_ONLY
dfPersist.show(false)
spark会自动检测每个persist()和cache()操作,它会检测各个结点的使用情况,如果数据不再使用会把持久化(persisted)的数据删掉,依据的是最近最少使用(least-recently-used LRU)算法。你也可以手动使用unpersist()将持久化的数据从内存和磁盘中删掉。
val dfPersist = dfPersist.unpersist()
2 关于spark persist()的两个坑
2.1 persist() + show()不能缓存全部数据
对于一个有100个分区的数据,假如你对其persist()后进行show(10),那么spark只会把第一个分区缓存起来,show前10条数据。如果你show的数据量超过了一个分区的大小,那么spark会多缓存一些分区。
因此persist()后如果希望数据全部都缓存到内存中,应对每个分区都执行action操作,如进行count()。
2.2 unpersist()一个rdd时会同时unpersist()子RDD
在spark 2.4之前的版本,当你创建了一个DataFrame a,同时由a得到b,并且把2个DataFrame都缓存起来了,如果执行a.unpersist(),会把b也自动unpersist()。
这个潜在的坑会使得你的spark程序增加了大量的计算量,鄙人之前由于计算数据量大,把代码分成了八步,每一步结束后都对当前df进行了persist() + count(),然后对父rdd进行了unpersist,这样导致了第一步重复计算了8次、第二步重复计算了7次、…、第七步重复计算了2次,通过查看spark UI才发现了此问题。