注意:先开启程序之后再将要统计的文本文档放入input目录下
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCountByHDFS {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("WordCountByHDFS").setMaster("local[2]")
/**
* 第一个参数是SparkConf对象
* 第二个参数是每批数据的时间间隔
*/
val ssc=new StreamingContext(conf,Seconds(3))
//此时用到textFileStream()方法,需要HDFS文件的路径。
val lines=ssc.textFileStream("E:\\input")
val result=lines.flatMap(_.split(",")).map(word=>(word,1)).reduceByKey(_+_)
result.print() //在控制台输出内容
ssc.start() //启动,开始接收并处理实时流数据
ssc.awaitTermination() //等待程序停止
ssc.stop()
}
}