带状态的更新是使用的updateStateByKey方法,里面传入一个函数,函数要自己写,注意需要设置checkpoint
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需要设置checkpoint
* 有状态的计算
*/
class UpdataByKey {
}
object UpdataByKey{
//自定义函数进行带状态更新
def addFunc (currValue:Seq[Int],point:Option[Int])={
Some(currValue.sum+point.getOrElse(0));
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UpdataByKey").setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(10))
val topics = "xiaopeng";
val topicMap = topics.split(",").map((_,2)).toMap
val lines = KafkaUtils.createStream(ssc,"192.168.10.219:2181","han",topicMap)
val words = lines.flatMap(line =>line._2.split(" ")).map(word =>(word,1))
words.updateStateByKey[Int](addFunc _)
words.print()
ssc.start()
ssc.awaitTermination()
}
}