kafka的receive方式实现WordCount,使用updateStateByKey函数,累加所有批次的wordCount

Spark Streaming的updateStateByKey可以把DStream中的数据按key做reduce操作,然后对各个批次的数据进行累加。注意
wordDstream.updateStateByKey[Int]每次传递给updateFunc函数两个参数,其中,
1、第一个参数是某个key(即某个单词)的当前批次的一系列值的列表(Seq[Int]形式),updateFunc函数中 val currentCount = values.foldLeft(0)(_ + _)的作用,就是计算这个被传递进来的与某个key对应的当前批次的所有值的总和,也就是当前批次某个单词的出现次数,保存在变量currentCount中。
2、传递给updateFunc函数的第二个参数是某个key的历史状态信息,也就是某个单词历史批次的词频汇总结果。实际上,某个单词的历史词频应该是一个Int类型,这里为什么要采用Option[Int]呢?
Option[Int]是类型 Int的容器,更确切地说,你可以把它看作是某种集合,这个特殊的集合要么只包含一个元素(即单词的历史词频),要么就什么元素都没有(这个单词历史上没有出现过,所以没有历史词频信息)。之所以采用 Option[Int]保存历史词频信息,这是因为,历史词频可能不存在,很多时候,在值不存在时,需要进行回退,或者提供一个默认值,Scala 为Option类型提供了getOrElse方法,以应对这种情况。 state.getOrElse(0)的含义是,如果该单词没有历史词频统计汇总结果,那么,就取值为0,如果有历史词频统计结果,就取历史结果,然后赋值给变量previousCount。最后,当前值和历史值进行求和,并包装在Some中返回。

package day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}

object kafkaConsumerDemo01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
    //new StreamingContext, 设置每个批次的时间间隔是5秒
    val ssc =new StreamingContext(conf,Duration(5000))
    //zookeeper地址,通过zookeeper获取元数据
    val zkQurme = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
    //使用checkPoint,保存中间的结果
    ssc.checkpoint("D:\\Yue\\chk_0")
    val groupid = "tt01"//组名
    val topics = Map("test02"->2)//要消费的topic名,和线程数,可以有多个topic
    //创建一个ReceiverInputDStream,相当于kafka的消费者
    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQurme,groupid,topics)
    val fm: DStream[String] = kafkaDStream.flatMap(_._2.split(" "))
    val maped: DStream[(String, Int)] = fm.map(x=>(x,1))
    //定义一个函数seq代表当前批次的某个key的所对应的value的集合
    //state是以前读取的这个key的value的总和
    val updateFunc =(seq:Seq[Int],state:Option[Int])=>{
      val crunt: Int = seq.foldLeft(0)(_+_)
      //使用foldLeft,进行累加,不能使用reduceLeft/reduce 会报 empty.reduceLeft
      val sum = crunt + state.getOrElse(0)
      Some(sum)
    }
    val update: DStream[(String, Int)] = maped.updateStateByKey(updateFunc)
    update.print()
    ssc.start()
    ssc.awaitTermination()

    
  }

}

猜你喜欢

转载自blog.csdn.net/Lu_Xiao_Yue/article/details/83958027