SparkStreaming对象的复用方式,其实说的是迭代计算的时候,不浪费原来的SparkStreaming对象具体操作方式如下
object TestStream {
def main(args: Array[String]): Unit = {
def createFunc():StreamingContext={
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("ts")
val dataDS: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.182.147",9999)
val wordDS: DStream[String] = dataDS.flatMap(_.split(" "))
val tupleDS: DStream[(String, Int)] = wordDS.map((_,1))
tupleDS.reduceByKey(_+_).print()
ssc
}
val ssc: StreamingContext = StreamingContext.getOrCreate("ts",createFunc)//在末尾调用
ssc.start()
ssc.awaitTermination()
}
}