大数据学习之路110-使用redis自己管理偏移量

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_37050372/article/details/83178103

首先我们将上次存进去的数据清理掉:

然后写程序,在上次程序的基础上,增加了记录偏移量的代码:

package com.test.sparkStreaming

import com.test.utils.JPools
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

/**
  * redis管理kafka消费数据的偏移量
  */
object SSCDirectKafka010_Redis_Offset {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val sparkConf: SparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
    //从kafka拉去数据限速,5*分区个数*采集数据的时间
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition","5")
    //优雅的停止
    sparkConf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")
    //定义一个采样时间,每隔两秒采集一次数据
    val ssc: StreamingContext = new StreamingContext(sparkConf,Seconds(2))
    //创建一个消费者id
    val groupId = "day14_002"
    //定义一个主题
    val topic = "wordcount2"

    /**
      * kafka参数列表
      */
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false:java.lang.Boolean)

    )
    //连接到kafka数据源
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRanges =  rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val reduced = rdd.map(_.value()).map((_,1)).reduceByKey(_+_)
      reduced.foreachPartition(
        it =>{
          val jedis: Jedis = JPools.getJedis
          it.foreach(
            y => {
              jedis.hincrBy("wordcount",y._1.toString,y._2.toLong)
            }
          )
          jedis.close()
        })
      //将偏移量存入redis
      val jedis: Jedis = JPools.getJedis
      for(o <- offsetRanges){
        jedis.hset(groupId,o.topic+"-"+o.partition,o.untilOffset.toString)

      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

执行结果如下:

到这里我们所做的只是将偏移量存起来了,而这只是第一步。

现在我们执行代码的话,他还会重复执行,还会从0开始消费,因为我们读的时候没有加判断。

接下来我们就要完善一下这个程序。

我们需要在连接kafka之前先访问redis数据库看之前有没有存储过偏移量。

我们将这个封装了一个方法:

扫描二维码关注公众号,回复: 3719283 查看本文章
package com.test.sparkStreaming

import java.util

import com.test.utils.JPools
import org.apache.kafka.common.TopicPartition
import redis.clients.jedis.Jedis

object JedisOffset {
     def apply(groupid:String) ={
       var fromDBOffset = Map[TopicPartition,Long]()
       val jedis: Jedis = JPools.getJedis
       val topicPartitionOffset: util.Map[String, String] = jedis.hgetAll(groupid)
       import scala.collection.JavaConversions._
       val list: List[(String, String)] = topicPartitionOffset.toList
       for(topicPL <- list){
         val split: Array[String] = topicPL._1.split("[-]")
         fromDBOffset += (new TopicPartition(split(0),split(1).toInt) -> topicPL._2.toLong)
       }
       fromDBOffset
     }
}

然后将原来的代码中修改这一段:

val stream = if(fromOffset.size==0){
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams)
      )
    }else{
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Assign[String,String](fromOffset.keys,kafkaParams,fromOffset)
      )
    }

这样就不会出现消费过的数据的重复消费了。

猜你喜欢

转载自blog.csdn.net/qq_37050372/article/details/83178103