1 package com.SparkStreaming_Flume_Poll 2 3 import java.net.InetSocketAddress 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 import org.apache.spark.{SparkConf, SparkContext} 9 10 object SparkStreaming_Flume_Poll { 11 //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1 12 //runningCount 历史的所有相同key的value总和 13 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 14 val newCount = runningCount.getOrElse(0) + newValues.sum 15 Some(newCount) 16 } 17 18 19 def main(args: Array[String]): Unit = { 20 //配置sparkConf参数 21 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") 22 //构建sparkContext对象 23 val sc: SparkContext = new SparkContext(sparkConf) 24 //设置日志级别 25 sc.setLogLevel("WARN") 26 //构建StreamingContext对象,每个批处理的时间间隔 27 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) 28 //设置checkpoint 29 scc.checkpoint("./") 30 //设置flume的地址,可以设置多台 31 val address = Seq(new InetSocketAddress("192.168.107.144", 8888)) 32 // 从flume中拉取数据 33 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK) 34 35 //获取flume中数据,数据存在event的body中,转化为String 36 val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array())) 37 //实现单词汇总 38 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction) 39 40 result.print() 41 scc.start() 42 scc.awaitTermination() 43 } 44 }
SparkStreaming_Flume_Poll
猜你喜欢
转载自www.cnblogs.com/xjqi/p/12817111.html
今日推荐
周排行