零、本讲学习目标
- 理解RDD持久化的必要性
- 了解RDD的存储级别
- 学会如何查看RDD缓存
一、RDD持久化
(一)引入持久化的必要性
- Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。
- Spark中重要的功能之一是可以将某个RDD中的数据保存到内存或者磁盘中,每次需要对这个RDD进行算子操作时,可以直接从内存或磁盘中取出该RDD的持久化数据,而不需要从头计算才能得到这个RDD。
(二)案例演示持久化操作
1、RDD的依赖关系图
- 读取文件,进行一系列操作,有多个RDD,如下图所示。
![在这里插入图片描述](https://img-blog.csdnimg.cn/827c8da90cbf485c846d8a11d6399c81.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
2、不采用持久化操作
- 在上图中,对RDD3进行了两次算子操作,分别生成了RDD4和RDD5。若RDD3没有持久化保存,则每次对RDD3进行操作时都需要从textFile()开始计算,将文件数据转化为RDD1,再转化为RDD2,最终才得到RDD3。
- 查看要操作的文件
![在这里插入图片描述](https://img-blog.csdnimg.cn/6d17a4c069864685abf97dcefc8b8364.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 启动Spark Shell
![在这里插入图片描述](https://img-blog.csdnimg.cn/fa1fc48f857f41d9b7db889221dac58d.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 按照图示进行操作,得RDD4和RDD5
![在这里插入图片描述](https://img-blog.csdnimg.cn/d1e65266219d4fa3b368fc63cf6c0525.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 计算RDD4,会RDD1到RDD2到RDD3到RDD4跑一趟,查看结果
![在这里插入图片描述](https://img-blog.csdnimg.cn/ed2aa738daba4759baa74af4c04dd269.png)
- 计算RDD5,也会RDD1到RDD2到RDD3到RDD4跑一趟,查看结果
![在这里插入图片描述](https://img-blog.csdnimg.cn/ddd803f4d086495c8d92a88ef72772e6.png)
3、采用持久化操作
- 可以在RDD上使用persist()或cache()方法来标记要持久化的RDD(cache()方法实际上底层调用的是persist()方法)。在第一次行动操作时将对数据进行计算,并缓存在节点的内存中。Spark的缓存是容错的:如果缓存的RDD的任何分区丢失,Spark就会按照该RDD原来的转换过程自动重新计算并缓存。
- 计算到RDD3时,标记持久化
![在这里插入图片描述](https://img-blog.csdnimg.cn/d3e4cfb1c6f341c480a03c8cd1b57e97.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 计算RDD4,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟
![在这里插入图片描述](https://img-blog.csdnimg.cn/11043b5bbf6b4a9788a9a02cddc8d02e.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 计算RDD5,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟
![在这里插入图片描述](https://img-blog.csdnimg.cn/fbc60b2807b84ef6b75f451150cc8a87.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
二、存储级别
(一)持久化方法的参数
- 利用RDD的persist()方法实现持久化,向persist()方法中传入一个
StorageLevel
对象指定存储级别。每个持久化的RDD都可以使用不同的存储级别存储,默认的存储级别是StorageLevel.MEMORY_ONLY
。
(二)Spark RDD存储级别表
- Spark RDD有七种存储级别
![在这里插入图片描述](https://img-blog.csdnimg.cn/28db5204f56e43f187e5881e57e963cb.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 在Spark的Shuffle操作(例如reduceByKey())中,即使用户没有使用persist()方法,也会自动保存一些中间数据。这样做是为了避免在节点洗牌的过程中失败时重新计算整个输入。如果想多次使用某个RDD,那么强烈建议在该RDD上调用
persist()
方法。
(三)如何选择存储级别 - 权衡内存使用率和CPU效率
- 如果RDD存储在内存中不会发生溢出,那么优先使用默认存储级别(MEMORY_ONLY),该级别会最大程度发挥CPU的性能,使在RDD上的操作以最快的速度运行。
- 如果RDD存储在内存中会发生溢出,那么使用MEMORY_ONLY_SER并选择一个快速序列化库将对象序列化,以节省空间,访问速度仍然相当快。
- 除非计算RDD的代价非常大,或者该RDD过滤了大量数据,否则不要将溢出的数据写入磁盘,因为重新计算分区的速度可能与从磁盘读取分区一样快。
- 如果希望在服务器出故障时能够快速恢复,那么可以使用多副本存储级别MEMORY_ONLY_2或MEMORY_AND_DISK_2。该存储级别在数据丢失后允许在RDD上继续运行任务,而不必等待重新计算丢失的分区。其他存储级别在发生数据丢失后,需要重新计算丢失的分区。
(四)查看persist()与cache()方法源码
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
- 从上述代码可以看出,cache()方法调用了persist()方法的无参方法,两者的默认存储级别都为MEMORY_ONLY,但cache()方法不可更改存储级别,而persist()方法可以通过参数自定义存储级别。
(五)案例演示设置存储级别
- 在
net.huawei.rdd
包里创建TestPersist
对象
![在这里插入图片描述](https://img-blog.csdnimg.cn/796ca9d922af467986071e5d99acb57b.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_9,color_FFFFFF,t_70,g_se,x_16)
package net.huawei.rdd
import org.apache.log4j.{
Level, Logger}
import org.apache.spark.{
SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object TestPersist {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("TestPersist")
.setMaster("local")
.set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("com").setLevel(Level.OFF)
System.setProperty("spark.ui.showConsoleProgress", "false")
Logger.getRootLogger().setLevel(Level.OFF)
val rdd: RDD[Int] = sc.parallelize(List(100, 200, 300, 400, 500))
rdd.persist()
val result: String = rdd.collect().mkString(", ")
println(result)
rdd.collect().foreach(println)
}
}
- 运行程序,查看结果
![在这里插入图片描述](https://img-blog.csdnimg.cn/2ce2b25a406f4be7beea99ec2b49b1df.png)
三、利用Spark WebUI查看缓存
(一)创建RDD并标记为持久化
![在这里插入图片描述](https://img-blog.csdnimg.cn/3974d9e10a4e431fa60ba2a9177d2390.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
(二)Spark WebUI查看RDD存储信息
- 浏览器中访问Spark Shell的WebUI
http://master:4040/storage/
查看RDD存储信息,可以看到存储信息为空
![在这里插入图片描述](https://img-blog.csdnimg.cn/ddda8ae277a7404781070f4e954b337a.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 执行命令:
rdd.collect()
,收集RDD数据
![在这里插入图片描述](https://img-blog.csdnimg.cn/9520572061f144e3b9fb1811fc6a4631.png)
- 刷新WebUI,发现出现了一个
ParallelCollectionRDD
的存储信息,该RDD的存储级别为MEMORY
,持久化的分区为8
,完全存储于内存中。
![在这里插入图片描述](https://img-blog.csdnimg.cn/138ca1e7aca041df8bc9dfbf3a82faf2.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 单击
ParallelCollectionRDD
超链接,可以查看该RDD的详细存储信息
![在这里插入图片描述](https://img-blog.csdnimg.cn/c2568c6a7ce946a7af959dd66e734248.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 上述操作说明,调用RDD的persist()方法只是将该RDD标记为持久化,当执行行动操作时才会对标记为持久化的RDD进行持久化操作。
- 执行以下命令,创建rdd2,并将rdd2持久化到磁盘
![在这里插入图片描述](https://img-blog.csdnimg.cn/12fe8c5b247a4ead884a023d9ff85c73.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- 刷新上述WebUI,发现多了一个
MapPartitionsRDD
的存储信息,该RDD的存储级别为DISK
,持久化的分区为8
,完全存储于磁盘中。
![在这里插入图片描述](https://img-blog.csdnimg.cn/1ce769b236bd4972bf936b0477590240.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
(三)将RDD从缓存中删除
- 执行以下命令,将
rdd(ParallelCollectionRDD)
从缓存中删除
![在这里插入图片描述](https://img-blog.csdnimg.cn/4db1558016d34522ae3bfad25bdef83e.png)
- 刷新上述WebUI,发现只剩下了
MapPartitionsRDD
,ParallelCollectionRDD
已被移除。
![在这里插入图片描述](https://img-blog.csdnimg.cn/2898b79f19fc479fa8f67ef78349b645.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAaG93YXJkMjAwNQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
- Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用的方式从缓存中删除旧的分区数据。如果希望手动删除RDD,而不是等待该RDD被Spark自动从缓存中删除,那么可以使用RDD的
unpersist()
方法。