SparkStreaming整合kafka的代码

SparkStreaming整合kafka的代码

def main(args: Array[String]): Unit = {
//1 创建sparkConf
var conf=new SparkConf().setAppName(“SparkStreaming”).setMaster(“local[*]”)
//2 创建sparkContext
var sc =new SparkContext(conf)
sc.setLogLevel(“WARN”)
//3 创建StreamingContext
val ssc= new StreamingContext(sc,Seconds(5))
val kafkaParams = Map[String, Object](
“bootstrap.servers” -> “hadoop01:9092,hadoop02:9092,hadoop03:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “SparkKafkaDemo”,
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
“auto.offset.reset” -> “latest”,
//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
“enable.auto.commit” -> (false: java.lang.Boolean)
)
//4 接收 kafka数据,并根据业务逻辑进行计算
val kafkaDatas: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,//本地策略,PreferConsistent官方推荐、高效、均衡
ConsumerStrategies.SubscribeString,String//消费策略
)
val WordOne: DStream[(String, Int)] = kafkaDatas.flatMap(a=>a.value().split(" ")).map((_,1))
val WordCount: DStream[(String, Int)] = WordOne.reduceByKeyAndWindow((a:Int, b:Int)=>a+b,Seconds(10),Seconds(5))
WordCount.print()
//5 开启实时任务
ssc.start()
//6 等待关闭任务
ssc.awaitTermination()
}

发布了238 篇原创文章 · 获赞 429 · 访问量 25万+

猜你喜欢

转载自blog.csdn.net/qq_45765882/article/details/105563492