写一个wordcount:
linux命令: nc -lk 8888
package day01
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SteamingWordCount {
def main(args: Array[String]): Unit = {
//SparkStreaming必须要有两个进程,一个拉取,一个计算
val conf = new SparkConf().setAppName("SteamingWordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
//创建一个SparkSteamingContext 批次的间隔为5秒切分一次
val ssc = new StreamingContext(sc,Seconds(5))
//创建一个DStream() 代表一系列的RDD
//通过一个TCP端口拉取数据创建DStream
val lines = ssc.socketTextStream("192.168.111.101",8888)
//对Dstream进行操作
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
val reduced: DStream[(String, Int)] =wordAndOne.reduceByKey(_+_)
//展示结果
reduced.print()
//需要手动开启,然后会一直执行
ssc.start()
//等待命令停止
ssc.awaitTermination()
//缺点在于只能单词记录,不能累加
}
}
定义一个动态版的wordcount:
package day01
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
object StateFulWordCount {
/**String,Seq[Int],Option[Int]
*第一个:key即单词
* 第二个:当前批次该单词出现的次数
* 第三个:初始化或者是以前累加过的值
*/
val updateFunc =(it:Iterator[(String,Seq[Int],Option[Int])])=>{
// it.map(t=>(t._1,t._2.sum + t._3.getOrElse(0)))
it.map{ case (w,s,o) =>(w,s.sum+o.getOrElse(0))}
}
def main(args: Array[String]): Unit = {
//SparkStreaming必须要有两个进程,一个拉取,一个计算
val conf = new SparkConf().setAppName("SteamingWordCount").setMaster("local[*]")
//创建一个SparkSteamingContext 批次的间隔为5秒切分一次
val ssc = new StreamingContext(conf,Milliseconds(5000))
//如果想要更新历史状态(累加),要设置checkpoint
ssc.checkpoint("E:\\大数据")
//创建一个DStream() 代表一系列的RDD
//通过一个TCP端口拉取数据创建DStream
val lines = ssc.socketTextStream("192.168.111.101",8888)
//对Dstream进行操作
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
val reduced =wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
//展示结果
reduced.print()
//需要手动开启,然后会一直执行
ssc.start()
//等待命令停止
ssc.awaitTermination()
}
}
//不同的是必须要定义一个checkpoint("/")==> 把前面的数据保存起来
//需要自定义一个函数