import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZkUtils, ZKGroupTopicDirs} import org.apache.spark.SparkConf import org.I0Itec.zkclient.ZkClient import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange, KafkaUtils} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by shuilin on 2018/4/2. */ object KafkaDirectStream { def main(args: Array[String]) { val group ="g10001" val conf = new SparkConf().setAppName("KafkaDirectStream") conf.set("spark.streaming.kafka.maxRatePerPartion","10000") val ssc = new StreamingContext(conf,Seconds(2)) val topic = "wordcount" val brokerList = "kafkaip:prot" val zkQuorum="zkipLprot" val topics:Set[String]=Set(topic) val topicDirs = new ZKGroupTopicDirs(group,topic) val zkTopicPath = s"${topicDirs.consumerOffsetDir}" println("zkTopicPath is:" +zkTopicPath) val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id"->group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString) val zkClient = new ZkClient(zkQuorum) val s = zkClient.exists(zkTopicPath) println("s value is :"+s) val children = zkClient.countChildren(zkTopicPath) println("c value is:" + children) var kafkaStream:InputDStream[(String,String)] = null var fromOffsets:Map[TopicAndPartition,Long]=Map() if(children>0){ for(i <- 0 until children){ val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}") val tp =TopicAndPartition(topic,i) fromOffsets += (tp -> partitionOffset.toLong) } val messageHandler = (mmd:MessageAndMetadata[String,String]) =>(mmd.topic,mmd.message()) kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler) }else{ kafkaStream= KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) } var offsetRanges = Array[OffsetRange]() kafkaStream.transform(rdd =>{ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }).map(_._2) .foreachRDD{rdd => rdd.foreachPartition(partition => partition.foreach{ println } ) for(o <- offsetRanges){ val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient,zkPath,o.fromOffset.toString()) } } /*val dd= sss.map(_._2) dd.foreachRDD(rdd =>{ rdd.collect().foreach(println(_)) })*/ ssc.start() ssc.awaitTermination() } }
SparkStreaming消费kafka记录偏移量的方式
猜你喜欢
转载自blog.csdn.net/zhoushuilin/article/details/79802015
今日推荐
周排行