介绍
- 背景 : 流应用成型必须全天候运行,因此必须适应与应用程序逻辑无关的故障(如系统故障,JVM崩溃等),为此Spark Streaming 需要将足够的信息检测点容错到存储系统,以便能够从故障中恢复
- 检测点包括两种类型 :
- 元数据检测点 : 将定义的流计算的信息保存到HDFS等容错存储
包括:
配置——用于创建流应用程序的配置。
DStream操作——定义流应用程序的DStream操作集。
不完整批处理—作业已排队但尚未完成的批处理。 - 数据点检测 : 将生成的RDDs保存到可靠的存储中
在一些跨多个批次组合数据的有状态转换中,这是必要的。在这样的转换中,生成的RDDs依赖于前几个批次的RDDs,这会导致依赖链的长度随时间不断增加。为了避免恢复时间的无限制增加(与依赖链成正比),有状态转换的中间rdd会定期检查到可靠的存储(例如HDFS),以切断依赖链
另外
无法从Spark Streaming中的检查点恢复累加器和广播变量。如果启用检查点并同时使用累加器或广播变量,则必须为累加器和广播变量创建延迟实例化的单例实例,以便在驱动程序在失败时重新启动后对它们进行重新实例化。
Demo
def main(args: Array[String]) {
// 准备工作
val conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaApp")
val checkpointPath = "hdfs://hadoop000:8020/offset_g5/checkpoint"
val topic = "g5spark"
val interval = 10
val kafkaParams = Map[String, String]("metadata.broker.list"->"hadoop000:9092","auto.offset.reset"->"smallest")
val topics = topic.split(",").toSet
def function2CreateStreamingContext():StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(10))
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams, topics)
ssc.checkpoint(checkpointPath)
messages.checkpoint(Duration(8*10.toInt*1000))
messages.foreachRDD(rdd => {
if(!rdd.isEmpty()){
println("---数据统计记录为:---" + rdd.count())
}
})
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointPath, function2CreateStreamingContext)
ssc.start()
ssc.awaitTermination()
}
官方文档查看:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing