Spark Streaming 之 consumer offsets 保存到 Zookeeper 以实现数据零丢失

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/yitengtongweishi/article/details/75116596

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式:

1)基于 Receiver-based 的 createStream 方法

2)Direct Approach (No Receivers) 方式的 createDirectStream 方法

就性能而言,第二种方式比第一种方式高效得多。但是第二种使用方式中 kafka 的 offset 是保存在 checkpoint 中的,Spark Streaming 并没有将 消费的偏移量 发送到Zookeeper中,这将导致那些基于偏移量的Kafka集群监控软件(比如:Apache Kafka监控之Kafka Web ConsoleApache Kafka监控之KafkaOffsetMonitor)失效。并且,如果程序重启的话,可能会丢失一部分数据,可以参考 Spark & Kafka - Achieving zero data-loss

官方只是蜻蜓点水地描述了可以用以下方法修改zookeeper中的consumer offsets(可以查看http://spark.apache.org/docs/latest/streaming-kafka-integration.html)

 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

所以更新zookeeper中的consumer offsets还需要自己去实现,并且官方提供的 createDirectStream重载 并不能很好的满足需求,需要进一步封装。具体看以下KafkaManager类的代码:

import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}

import scala.reflect.ClassTag

/**
  * Created by YZX on 2017/5/20 13:14 in Beijing.
  */

class KafkaManager(val kafkaParams: Map[String, String]) {

  private val kc = new KafkaCluster(kafkaParams)

  /**
    * 创建数据流
    *
    * @param ssc
    * @param kafkaParams
    * @param topics
    * @tparam K
    * @tparam V
    * @tparam KD
    * @tparam VD
    * @return
    */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] =  {
    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上读取offsets前先根据实际情况更新offsets
    setOrUpdateOffsets(topics, groupId)

    //从zookeeper上读取offset开始消费message
    val partitionsE = kc.getPartitions(topics)
    if (partitionsE.isLeft) { throw new SparkException("get kafka partition failed:") }
    val partitions = partitionsE.right.get
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft) { throw new SparkException("get kafka consumer offsets failed:") }
    val consumerOffsets = consumerOffsetsE.right.get
    KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    )
  }

  /**
    * 创建数据流前,根据实际消费情况更新消费offsets
    *
    * @param topics
    * @param groupId
    */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach{ topic =>
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft) { throw new SparkException("get kafka partition failed:") }
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) { hasConsumed = false }
      if (hasConsumed) {// 消费过
        /**
          * 如果zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
          * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
          * 这时把consumerOffsets更新为earliestLeaderOffsets
          */
        val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach{ case(tp, n) =>
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          if(n < earliestLeaderOffset) {
            println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + " offsets已经过时,更新为" + earliestLeaderOffset)
            offsets += (tp -> earliestLeaderOffset)
          }
        }
        if(!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets) }
      } else {// 没有消费过
        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null

        if(reset == Some("smallest")) { leaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get }
        else { leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get }

        val offsets = leaderOffsets.map { case (tp, offset) => (tp, offset.offset) }
        kc.setConsumerOffsets(groupId, offsets)
      }
    }
  }

  /**
    * 更新zookeeper上的消费offsets
    *
    * @param rdd
    * @param offsetRanges
    */
  def updateZKOffsets(rdd: RDD[(String, String)], offsetRanges: Array[OffsetRange]) : Unit = {
    val groupId = kafkaParams.get("group.id").get
    //val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetRanges) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") }
    }
  }
}

其中,org.apache.spark.streaming.kafka.KafkaCluster 权限为私有,所以需要把这部分源码拷贝出来。
最后,来一个完整的例子。

import java.sql.DriverManager
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by YZX on 2017/5/20 13:14 in Beijing.
  */

object DirectKafkaReportStreaming {
  def main(args: Array[String]) {
    // 屏蔽不必要的日志显示在终端上
    Logger.getLogger("org").setLevel(Level.WARN)

    val conf = new SparkConf().setAppName("DirectKafkaReportStreaming")      //.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(args(0).toLong))

    // Create direct kafka stream with brokers and topics
    val topics = Set("report.pv_account", "report.base_reach_click", "report.base_second_jump", "report.base_conversion_click", "report.base_conversion_imp")
    val brokers = "192.168.145.216:9092, 192.168.145.217:9092, 192.168.145.218:9092, 192.168.145.221:9092, 192.168.145.222:9092, 192.168.145.223:9092, 192.168.145.224:9092, 192.168.145.225:9092, 192.168.145.226:9092, 192.168.145.227:9092"
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "consumer.timeout.ms" -> "30000", "group.id" -> "YZXDirectKafkaReportStreaming")

    // Hold a reference to the current offset ranges, so it can be used downstream
    var offsetRanges = Array[OffsetRange]()

    val km = new KafkaManager(kafkaParams)
    val directKafkaStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    // This is safe because we haven't shuffled or otherwise disrupted partitioning and the original input rdd partitions were 1:1 with kafka partitions
    directKafkaStream.transform{ rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD{ rdd =>
      if (!rdd.isEmpty()) {
        //先处理消息
        processRDD(rdd.map(_._2))
        //再更新offsets
        km.updateZKOffsets(rdd, offsetRanges)
      }
      //不能保证Exactly once,因为更新 Mysql 和更新 zookeeper 不是一个事务
    }

    ssc.start()
    ssc.awaitTermination()
    ssc.stop(true, true)  //优雅地结束
  }

  def getDouble(input: String) : Double = try{ input.toDouble } catch { case e: Exception => 0.0 }

  def getLong(input: String) : Long =  try{ input.toLong } catch { case e: Exception => 0L }

  def processRDD(messages: RDD[String] ) = {
    val (dbDriver, url_144_237, user, password) = ("com.mysql.jdbc.Driver", "jdbc:mysql://192.168.144.237:3306/", "data", "PIN239!@#$%^&8")

    val keyValue = messages.map(_.split("\t", -1).map(_.trim)).filter(_.length == 97).map{ arr =>
      val kafkaTime = arr(0)   //kafka上数据的业务时间
      val day = try { kafkaTime.substring(0, 8) } catch { case e: Exception => "-l" }    //天
      val hour = try { kafkaTime.substring(8, 10) } catch { case e: Exception => "-l" }    //小时
      val (partner_id, advertiser_company_id, advertiser_id, order_id) = (arr(1), arr(2), arr(3), arr(4))
      val (campaign_id, sub_campaign_id, exe_campaign_id, vertical_tag_id, conversion_pixel) = (arr(5), arr(6), arr(7), arr(8), arr(9))
      val (creative_size, creative_id, creative_type, inventory_type, ad_slot_type, platform) = (arr(10), arr(11), arr(12), arr(16), arr(17), arr(24))
      //维度
      val key = (day, hour, partner_id, advertiser_company_id, advertiser_id, order_id, campaign_id, sub_campaign_id, exe_campaign_id, vertical_tag_id, conversion_pixel, creative_size, creative_id, creative_type, inventory_type, ad_slot_type, platform)

      val raw_media_cost: Double = getDouble(arr(53))
      val media_cost: Double = getDouble(arr(54))
      val service_fee: Double = getDouble(arr(55))
      val media_tax: Double = getDouble(arr(56))
      val service_tax: Double =  getDouble(arr(57))
      val total_cost: Double = getDouble(arr(58))
      val system_loss: Double = getDouble(arr(59))

      val bid: Long = getLong(arr(61))
      val imp: Long = getLong(arr(62))
      val click: Long = getLong(arr(63))
      val reach: Long = getLong(arr(64))
      val two_jump: Long = getLong(arr(65))
      val click_conversion: Long = getLong(arr(66))
      val imp_conversion: Long = getLong(arr(67))
      //指标
      val value = Array[Double](raw_media_cost, media_cost, service_fee, media_tax, service_tax, total_cost, system_loss, bid, imp, click, reach, two_jump, click_conversion, imp_conversion)

      (key, value)
    }

    //按照维度聚合,对应指标累加
    val reduceRDD = keyValue.reduceByKey{ case (v1, v2) => v1.zip(v2).map(x => x._1 + x._2) }

    reduceRDD.foreachPartition{ iter =>
      Class.forName(dbDriver)
      val connection = DriverManager.getConnection(url_144_237, user, password)
      //connection.setAutoCommit(false)  //关闭事务自动提交
      val statement = connection.createStatement()

      for(row <- iter) {
        val key = row._1    //维度

        val day = key._1
        val hour = key._2

        val partner_id = try{ key._3.toLong } catch { case e: Exception => -1L }
        val advertiser_company_id = try { key._4.toLong } catch { case e: Exception => -1L }
        val advertiser_id = try { key._5.toLong } catch { case e: Exception => -1L }
        val order_id = try { key._6.toLong } catch { case e: Exception => -1L }
        val campaign_id = try { key._7.toLong } catch { case e: Exception => -1L }
        val sub_campaign_id = try { key._8.toLong } catch { case e: Exception => -1L }
        val exe_campaign_id = try { key._9.toLong } catch { case e: Exception => -1L }
        val vertical_tag_id = try { key._10.toLong } catch { case e: Exception => -1L }
        val conversion_pixel = try { key._11.toLong } catch { case e: Exception => -1L }

        val creative_size = if(key._12 != null) key._12 else ""
        val creative_id = if(key._13 != null) key._13 else ""
        val creative_type = if(key._14 != null) key._14 else ""
        val inventory_type = if(key._15 != null) key._15 else ""
        val ad_slot_type = if(key._16 != null) key._16 else ""
        val platform = if(key._17 != null) key._17 else ""

        val value = row._2    //指标

        val raw_media_cost = value(0)
        val media_cost = value(1)
        val service_fee = value(2)
        val media_tax = value(3)
        val service_tax = value(4)
        val total_cost = value(5)
        val system_loss = value(6)

        val bid = value(7).toLong
        val imp = value(8).toLong
        val click = value(9).toLong
        val reach = value(10).toLong
        val two_jump = value(11).toLong
        val click_conversion = value(12).toLong
        val imp_conversion = value(13).toLong

        //没有就插入,有就更新,需要对保持唯一的字段建立唯一索引
        val sql =
          s"""
             |INSERT INTO test.rpt_effect_newday
             |(day, hour, partner_id, advertiser_company_id, advertiser_id, order_id,
             |campaign_id, sub_campaign_id, exe_campaign_id, vertical_tag_id, conversion_pixel,
             |creative_size, creative_id, creative_type, inventory_type, ad_slot_type, platform,
             |raw_media_cost, media_cost, service_fee,media_tax, service_tax, total_cost, system_loss,
             |bid, imp, click, reach, two_jump, click_conversion, imp_conversion)
             |VALUES ($day, $hour, $partner_id, $advertiser_company_id, $advertiser_id, $order_id,
             |$campaign_id, $sub_campaign_id, $exe_campaign_id, $vertical_tag_id, $conversion_pixel,
             |'$creative_size', '$creative_id', '$creative_type', '$inventory_type', '$ad_slot_type', '$platform',
             |$raw_media_cost, $media_cost, $service_fee, $media_tax, $service_tax, $total_cost, $system_loss,
             |$bid, $imp, $click, $reach, $two_jump, $click_conversion, $imp_conversion)
             |ON DUPLICATE KEY UPDATE
             |raw_media_cost=raw_media_cost+$raw_media_cost, media_cost=media_cost+$media_cost, service_fee=service_fee+$service_fee, media_tax=media_tax+$media_tax, service_tax=service_tax+$service_tax, total_cost=total_cost+$total_cost, system_loss=system_loss+$system_loss,
             |bid=bid+$bid, imp=imp+$imp, click=click+$click, reach=reach+$reach, two_jump=two_jump+$two_jump, click_conversion=click_conversion+$click_conversion, imp_conversion=imp_conversion+$imp_conversion
           """.stripMargin.replace("\n", " ")

        statement.addBatch(sql)
      }

      statement.executeBatch()   //执行批量更新
      //connection.commit()     //语句执行完毕,提交本事务
      connection.close()       //关闭数据库连接
    }
  }
}

友情链接如下:

Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现

将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用

spark streaming kafka1.4.1中的低阶api createDirectStream使用总结,directstream

Exactly-once Spark Streaming from Apache Kafka

https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/kafka-direct-spark-streaming-checkpoints-code-changes/td-p/38697

https://github.com/koeninger/kafka-exactly-once/tree/spark-1.6.0

猜你喜欢

转载自blog.csdn.net/yitengtongweishi/article/details/75116596