版权声明:Designed By JiaMingcan https://blog.csdn.net/qq_41571900/article/details/84205750
这次的博客向大家介绍一下将偏移量存储在Zookeeper中。
我在注明书写逻辑的地方,可以在那里对RDD进行算子操作。
package kafka1
import kafka.common.TopicAndPartition
import kafka.message. MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
object KafkaDirectZookeeper {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("direct").setMaster("local[2]")
val ssc = new StreamingContext(conf,Duration(5000));
//指定组名
val groupId="gp01"
//指定消费的topic名字
val topic ="test002"
//指定kafka的Broker地址(SparkStreaming的Task直连到Kafka分区上,用的是底层API消费)
val brokerList="192.168.85.200:9092"
//我们要自己维护Offset偏移量,将Offset保存到zk中
val zkQuorum = "192.168.85.200:2181"
//创建Stream时,使用topic名字集合,SparkStreaming可以同时消费多个Topic
val topics = Set(topic)
//创建一个ZKGroupTopicDrirs对象,其实是指定往ZK中写入数据的目录
//用于保存偏移量
val topicDir=new ZKGroupTopicDirs(groupId,topic)
//获取zookeeper中的路径"/consumers/gp01/offsets/test002/"
val zkTopicPath =s"${topicDir.consumerOffsetDir}"
//准备kafka参数
val kafkas=Map(
"metadata.broker.list"->brokerList,
"group.id"->groupId,
//从头读取数据
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
)
//zookeeper的host和ip,创建一个Client,用于更新偏移量
//它是zookeeper客户端,可以从zk中读取偏移量数,并更新偏移量
val zkClient = new ZkClient(zkQuorum)
val clientOffset = zkClient.countChildren(zkTopicPath)
//创建kafkaStream
var kafkaStream:InputDStream[(String,String)]=null
//如果zookeeper中有保存offset 我们会利用这个offset作为kafkaStream的其实
//TopicAndPartition
var fromOffsets:Map[TopicAndPartition,Long]=Map()
//如果保存过offset
if(clientOffset>0){
//clientOffset的数量其实就是 /gp01/offset/test的分区数目
for(i<-0 until clientOffset){
val partitionOffset=zkClient.readData[String](s"$zkTopicPath/$i")
val tp = TopicAndPartition(topic,i)
//将不同partition对应的offset增加到fromOffsets中
fromOffsets += (tp->partitionOffset.toLong)
}
//key 是kafka的key value是kafka数据
//这个会将kafka的消息进行transform 最终kafka的数据都会变成(kafka的key,message)的tuple
val messageHandler=(mmd:MessageAndMetadata[String,String])=>
(mmd.key(),mmd.message())
//通过kafkaUtils创建直连的DStream
kafkaStream=KafkaUtils.createDirectStream
[String,String,StringDecoder,
StringDecoder,(String,String)](ssc,kafkas,fromOffsets,messageHandler)
}else{
//如果未保存,根据kafkas的配置使用最新的或者最旧的的offset
kafkaStream =KafkaUtils.createDirectStream
[String,String,StringDecoder,StringDecoder](ssc,kafkas,topics)
}
//偏移量范围
var offsetRanges = Array[OffsetRange]()
kafkaStream.foreachRDD{
//对RDD进行操作,触发Action
kafkardd=>
offsetRanges =kafkardd.asInstanceOf[HasOffsetRanges].offsetRanges
//下面可以对maps做rdd操作
val maps = kafkardd.map(_._2)
if(!maps.isEmpty){
//用于书写逻辑
}
for(o <- offsetRanges){
val zkpath=s"${topicDir.consumerOffsetDir}/${o.partition}"
//将该Partition的offset保存到zookeeper中
// /gp01/offset/test
ZkUtils.updatePersistentPath(zkClient,zkpath,o.untilOffset.toString)
}
}
ssc.start()
ssc.awaitTermination()
}
}
在这里,我画了一下图给大家介绍下存储流程:
我们通过从producer端输入数据,然后它会分区存储在Kafka集群中,他会根据你设置的分区数进行存储,但是它存储的并不是平均的,在这里我设置的partition数量为3。当Driver端需要读取数据时,它会去Zookeeper中根据自身访问的topic和groupID查询是否在zookeeper中保存有数据(如果以前使用同一消费者组读取同一Topic会保存数据),如果保存有就分别读取每个分区的偏移量,如果没有就从最新读或者最旧读,当读取完数据后,会通过读取的范围对偏移量进行更新,这样就完成了Zookeeper保存偏移量。
summed up by JiaMingcan
转载请署名:JiaMingcan