版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_37050372/article/details/83178103
首先我们将上次存进去的数据清理掉:
然后写程序,在上次程序的基础上,增加了记录偏移量的代码:
package com.test.sparkStreaming
import com.test.utils.JPools
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
/**
* redis管理kafka消费数据的偏移量
*/
object SSCDirectKafka010_Redis_Offset {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sparkConf: SparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//从kafka拉去数据限速,5*分区个数*采集数据的时间
sparkConf.set("spark.streaming.kafka.maxRatePerPartition","5")
//优雅的停止
sparkConf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")
//定义一个采样时间,每隔两秒采集一次数据
val ssc: StreamingContext = new StreamingContext(sparkConf,Seconds(2))
//创建一个消费者id
val groupId = "day14_002"
//定义一个主题
val topic = "wordcount2"
/**
* kafka参数列表
*/
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
//连接到kafka数据源
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams)
)
stream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val reduced = rdd.map(_.value()).map((_,1)).reduceByKey(_+_)
reduced.foreachPartition(
it =>{
val jedis: Jedis = JPools.getJedis
it.foreach(
y => {
jedis.hincrBy("wordcount",y._1.toString,y._2.toLong)
}
)
jedis.close()
})
//将偏移量存入redis
val jedis: Jedis = JPools.getJedis
for(o <- offsetRanges){
jedis.hset(groupId,o.topic+"-"+o.partition,o.untilOffset.toString)
}
})
ssc.start()
ssc.awaitTermination()
}
}
执行结果如下:
到这里我们所做的只是将偏移量存起来了,而这只是第一步。
现在我们执行代码的话,他还会重复执行,还会从0开始消费,因为我们读的时候没有加判断。
接下来我们就要完善一下这个程序。
我们需要在连接kafka之前先访问redis数据库看之前有没有存储过偏移量。
我们将这个封装了一个方法:
扫描二维码关注公众号,回复:
3719283 查看本文章
package com.test.sparkStreaming
import java.util
import com.test.utils.JPools
import org.apache.kafka.common.TopicPartition
import redis.clients.jedis.Jedis
object JedisOffset {
def apply(groupid:String) ={
var fromDBOffset = Map[TopicPartition,Long]()
val jedis: Jedis = JPools.getJedis
val topicPartitionOffset: util.Map[String, String] = jedis.hgetAll(groupid)
import scala.collection.JavaConversions._
val list: List[(String, String)] = topicPartitionOffset.toList
for(topicPL <- list){
val split: Array[String] = topicPL._1.split("[-]")
fromDBOffset += (new TopicPartition(split(0),split(1).toInt) -> topicPL._2.toLong)
}
fromDBOffset
}
}
然后将原来的代码中修改这一段:
val stream = if(fromOffset.size==0){
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams)
)
}else{
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String,String](fromOffset.keys,kafkaParams,fromOffset)
)
}
这样就不会出现消费过的数据的重复消费了。