Spak Streaming算子:checkpoint

介绍
  1. 背景 : 流应用成型必须全天候运行,因此必须适应与应用程序逻辑无关的故障(如系统故障,JVM崩溃等),为此Spark Streaming 需要将足够的信息检测点容错到存储系统,以便能够从故障中恢复
  2. 检测点包括两种类型 :
  • 元数据检测点 : 将定义的流计算的信息保存到HDFS等容错存储
    包括:
    配置——用于创建流应用程序的配置。
    DStream操作——定义流应用程序的DStream操作集。
    不完整批处理—作业已排队但尚未完成的批处理。
  • 数据点检测 : 将生成的RDDs保存到可靠的存储中
    在一些跨多个批次组合数据的有状态转换中,这是必要的。在这样的转换中,生成的RDDs依赖于前几个批次的RDDs,这会导致依赖链的长度随时间不断增加。为了避免恢复时间的无限制增加(与依赖链成正比),有状态转换的中间rdd会定期检查到可靠的存储(例如HDFS),以切断依赖链
另外

无法从Spark Streaming中的检查点恢复累加器和广播变量。如果启用检查点并同时使用累加器或广播变量,则必须为累加器和广播变量创建延迟实例化的单例实例,以便在驱动程序在失败时重新启动后对它们进行重新实例化

Demo

def main(args: Array[String]) {
    // 准备工作
    val conf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaApp")

    val checkpointPath = "hdfs://hadoop000:8020/offset_g5/checkpoint"
    val topic = "g5spark"
    val interval = 10
    val kafkaParams = Map[String, String]("metadata.broker.list"->"hadoop000:9092","auto.offset.reset"->"smallest")
    val topics = topic.split(",").toSet

    def function2CreateStreamingContext():StreamingContext = {
      val ssc = new StreamingContext(conf, Seconds(10))
      val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams, topics)
      ssc.checkpoint(checkpointPath)
      messages.checkpoint(Duration(8*10.toInt*1000))

      messages.foreachRDD(rdd => {
        if(!rdd.isEmpty()){
          println("---数据统计记录为:---" + rdd.count())
        }
      })
      ssc
    }

    val ssc = StreamingContext.getOrCreate(checkpointPath, function2CreateStreamingContext)
    ssc.start()
    ssc.awaitTermination()
  }

官方文档查看:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

猜你喜欢

转载自blog.csdn.net/huonan_123/article/details/86680894
今日推荐