cache缓存的说明
- RDD通过调用cache方法将前面的计算结果缓存到内存或磁盘,默认缓存到JVM内存中。
- 如果后面的计算还有需要此RDD的计算结果,可以直接从缓存中获取,而不用重新计算。
- cache缓存不会切断原有血缘关系,只会增加血缘关系。
- cache不会立即执行,当第一次遇到action算子时才会真正执行。
- cache缓存不管存储到内存还是存储到磁盘,都会随着程序执行结束而销毁。
cache缓存的使用
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("hello spark","hello scala"))
val wordAndOne: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1))
val printRDD: RDD[(String, Int)] = wordAndOne.map(
datas => {
println("-------------------------")
datas
}
)
println(printRDD.toDebugString)
printRDD.cache()
printRDD.collect()
println(printRDD.toDebugString)
printRDD.collect()
sc.stop()
checkpoint检查点的说明
- 如果依赖血缘关系过长,容错成本会很高,可以设置checkpoint检查点存储RDD中间结果。
- checkpoint检查点可以将RDD中间结果存储到HDFS等高可用存储介质中。
- checkpoint会切断原有血缘关系。
- 为了保证checkpoint的数据可靠性,程序在执行时会从血缘关系的最开始执行一遍,所以一般和cache配合使用,以提高效率。
- checkpoint同样也是遇到action算子才会执行。
checkpoint的使用
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("d:\\testCheckpoint")
val rdd: RDD[String] = sc.makeRDD(List("hello spark","hello scala"))
val wordAndOne: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1))
val printRDD: RDD[(String, Int)] = wordAndOne.map(
datas => {
println("-------------------------")
datas
}
)
println(printRDD.toDebugString)
printRDD.cache()
printRDD.checkpoint()
printRDD.collect()
println(printRDD.toDebugString)
printRDD.collect()
printRDD.unpersist()
sc.stop()
cache和checkpoint的区别
- cache缓存不会切断依赖血缘关系,checkpoint会切断依赖血缘关系。
- cache缓存会将数据临时存储在内存或者磁盘中,随着程序运行结束,cache存储的数据就销毁。checkpoint可以将数据存储在HDFS等高可用,可靠性高的文件系统中。
- 为了保证checkpoint的数据可靠性,checkpoint后的job在第一次执行时,会从RDD开始的位置重新计算一次。一般配合cache缓存使用,提高执行效率。
- 缓存使用完毕之后可以通过unpersist()算子释放缓存。