所谓的手动管理偏移量就是用户自己定义消息何时被真正处理完,并在提交偏移量之前一般会用偏移量做一些其他的操作,好处是用户可以确保只有消息被真正处理完成后再提交偏移量。所以需要我们在代码逻辑中得到实时的偏移量,并且保证<<任务处理完成之后再提交偏移量>>这种时序性。
手动管理kafka偏移量有以下优点:
a)一般情况下,保证数据不丢失,不重复被消费
b)可以方便地查看offset信息
操作的api是OffsetRange这个类,它有untilOffset()方法,这个方法可以得到该批次数据中操作的分区对应的终止偏移量。另外还有fromOffset()方法,对应起始偏移量。
大致的操作思路如下
- 在Kafka DirectStream初始化时,取得当前所有partition的中数据的offset
- 读取offset数据,处理并存储结果。
- 提交offset,并将其持久化在可靠的外部存储中。
同时要注意的是在当前普遍用的是0.10左右的kafka,在Kafka 0.10开始的版本中,offset的默认存储由ZooKeeper移动到了kafka一个自带的topic中,名为__consumer_offsets,因此Spark Streaming也专门提供了commitAsync API用于提交offset
提交的核心代码如下
//stream是streaming数据集
stream.foreachRDD {
rdd =>
//把rdd转化
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//中间是所需代码业务,此处省略
// 在计算已经完成就提交偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
使用的全套代码如下
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{
SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.{
CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{
Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object StreamFromKafka {
def main(args: Array[String]): Unit = {
//前面的可以不用看,就是正常的流计算
val conf = new SparkConf().setAppName("StreamWordCount").setMaster("local[2]")
val sc = new StreamingContext(conf,Seconds(10))
sc.sparkContext.setLogLevel("ERROR")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.182.149:9092,192.168.182.147:9092,192.168.182.148:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"enable.auto.commit" -> (true: java.lang.Boolean) //将提交设置成手动提交false,默认true,自动提交到kafka
)
/**
* LocationStrategies.PreferBrokers() 仅仅在你 spark 的 executor 在相同的节点上,优先分配到存在 kafka broker 的机器上;
* LocationStrategies.PreferConsistent(); 大多数情况下使用,一致性的方式分配分区所有 executor 上。(主要是为了分布均匀)
* 新的Kafka使用者API将预先获取消息到缓冲区。因此,出于性能原因,Spark集成将缓存的消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。
*在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区。如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意为该分区安排Kafka领导者的分区。
*/
val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val kafkaStream = stream.map(record => (record.key, record.value))
stream.foreachRDD(rdd =>{
//此处获取当前rdd中分区的offset值
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(partitions =>{
//此处将的遍历分区,通过TaskContext类得到每个分区的标识从而获取fromOffset和untilOffset
val o = offsetRanges(TaskContext.get().partitionId())
//打印到控制台可以明了的查看offset值
println(o.fromOffset+"- - - - - - - - - - "+o.untilOffset)
//这里就是业务中怎么处理offset了,也可以处理数据
partitions.foreach(line =>{
//将分区中数据的key,value值打印到控制台
println("key"+line.key()+"...........value"+line.value())
})
})
//手动提交处理后的offset值到kafka
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
val words = kafkaStream.flatMap(_._2.split(" "))
val pairs = words.map {
x => (x,1) }
val wordCounts = pairs.reduceByKey(_+_)
wordCounts.print()
sc.start()
sc.awaitTermination()
}
}
而如果你用的Kafka0.10以前的,那么你就要提交到zookeeper里面,而不是kafka,操作代码如下
object KafkaDirectWordCount_zookeeper {
def main(args: Array[String]): Unit = {
//消费者id
val group = "g001"
//主题
val topic = "wordcount"
val topics = Array(topic)
//创建SparkConf
val conf = new SparkConf().setAppName("KafkaDirectWordCount_zookeeper").setMaster("local[2]")
val streamingContext = new StreamingContext(conf, Seconds(5));
//指定kafka的broker地址
val brokerList = "node01:9092,node02:9092,node03:9092"
//指定zk的地址,为了后面处理偏移量准备
val zkQuorum = "node01:2181,node02:2181,node03:2181"
//创建一个 ZKGroupTopicDirs 对象,这个对象对应的是一个zkClient中可以查看的一个路径
//该路径标识着对应消费者组的所有信息
val topicDirs = new ZKGroupTopicDirs(group, topic)
//获取这个消费者组在zookeeper中消费wordcount的偏移量存储路径
//按照上面的配置获取到的路径应该是/g001/offsets/wordcount/
val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
//配置kafka的参数
val kafkaParams = Map[String, Object](
//指定broker所在位置
"bootstrap.servers" -> brokerList,
//指定写入数据和读取数据的编码方式
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> group,
"auto.offset.reset" -> "earliest", // 从0偏移量开始读取数据,默认lastest从最后也就是最新数据,最终用那个都是看业务需求,这里是为了测试代码要多次运行才每次重头开始读取
//关闭自动提交偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//创建一个zookeeper的客户端,维护偏移量
val zkClient = new ZkClient(zkQuorum)
//我们要判断一下这个路径下是否有子路径,如果是空的,就表示该组没有过
val children = zkClient.countChildren(zkTopicPath)
var kafkaStream: InputDStream[ConsumerRecord[String, String]] = null
//如果有子节点,那么就表示之前消费过且保存了offset,我们要对其做处理
if (children>0){
//如果 zookeeper 中有保存 offset,我们就用这个 offset 作为 kafkaStream 的起始位置
var offsets: collection.mutable.Map[TopicPartition, Long] = collection.mutable.Map[TopicPartition, Long]()
//手动维护过偏移量
//1.先将已有的对应每个topic分区的偏移量读取出来与kafka的分区对应
for (i <- 0 until children){
// 获取zookeeper中保存的分区的offset,如/g001/offsets/wordcount/0 代表0分区
val partitionOffset = zkClient.readData[Long](s"$zkTopicPath/${i}")
// 建立一个分区的标识,对应topic的分区
val tp =new TopicPartition(topic, i)
//将获取到的漂移量和topic里面的分区一一对应在Map中
offsets.put(tp,partitionOffset.toLong)
}
//通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)
kafkaStream = KafkaUtils.createDirectStream[String,String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,offsets))
}else{
//如果未保存,那就表示之前没有被消费offset
kafkaStream = KafkaUtils.createDirectStream[String,String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
}
//读取完数据,我们先维护最新偏移量,和0.10之后的差不多的操作
kafkaStream.foreachRDD(rdd=>{
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//业务处理
val lines: RDD[String] = rdd.map(_.value())
//对RDD进行操作,触发Action
lines.foreachPartition(partition =>
partition.foreach(x => {
println(x)
})
)
//更新偏移量
for(osr <- ranges) {
// /g001/offsets/wordcount/0
val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
//将该 partition 的 offset 保存到 zookeeper
//如果目录不存在先创建
//println(zkPath)
if (!zkClient.exists(zkPath)) {
zkClient.createPersistent(zkPath, true)
}
//写入数据
zkClient.writeData(zkPath, osr.untilOffset)
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
像zookeeper这种维护方式,随着kafka自己维护偏移量,其实已经成了一个外部维护偏移量的开发分割,而不再是一个单单的zookeeper,比如mysql等都是可以用这种方式的,下面我给大家看一个用redis维护偏移量的写法
import java.{
lang, util}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{
Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{
Seconds, StreamingContext}
import redis.clients.jedis.Jedis
object WCKafkaRedisApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("xx")
//每秒钟每个分区kafka拉取消息的速率
.set("spark.streaming.kafka.maxRatePerPartition", "100")
// 序列化
.set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer")
// 建议开启rdd的压缩
.set("spark.rdd.compress", "true")
val ssc = new StreamingContext(conf, Seconds(2))
//启动一参数设置
val groupId = "test002"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hdp01:9092,hdp02:9092,hdp03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
val topics = Array("test")
//启动二参数设置 JedisOffset是自己分装了一个代码作用和原先的差不多,都是判断是不是有偏移量的,如果有分装Map
var formdbOffset: Map[TopicPartition, Long] = JedisOffset(groupId)
//拉取kafka数据
val stream: InputDStream[ConsumerRecord[String, String]] = if (formdbOffset.size == 0) {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
} else {
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](formdbOffset.keys, kafkaParams, formdbOffset)
)
}
//数据偏移量处理。
stream.foreachRDD({
rdd =>
//获得偏移量对象数组
val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//逻辑处理
rdd.flatMap(_.value().split(" ")).map((_, 1)).reduceByKey(_ + _).foreachPartition({
it =>
val jedis = RedisUtils.getJedis
it.foreach({
v =>
jedis.hincrBy("wordcount", v._1, v._2.toLong)
})
jedis.close()
})
//偏移量存入redis
val jedis: Jedis = RedisUtils.getJedis
for (or <- offsetRange) {
jedis.hset(groupId, or.topic + "-" + or.partition, or.untilOffset.toString)
}
jedis.close()
})
ssc.start()
ssc.awaitTermination()
}
}
import java.util
import org.apache.kafka.common.TopicPartition
object JedisOffset {
def apply(groupId: String) = {
var formdbOffset = Map[TopicPartition, Long]()
val jedis1 = RedisUtils.getJedis
val topicPartitionOffset: util.Map[String, String] = jedis1.hgetAll(groupId)
import scala.collection.JavaConversions._
val topicPartitionOffsetlist: List[(String, String)] = topicPartitionOffset.toList
for (topicPL <- topicPartitionOffsetlist) {
val split: Array[String] = topicPL._1.split("[-]")
formdbOffset += (new TopicPartition(split(0), split(1).toInt) -> topicPL._2.toLong)
}
formdbOffset
}
}
对比一下不难发现,其实写法都是一样的,只是偏移量保存在哪里,怎么用而已
还有人想为什么不用checkpoint,其实啊,虽然Spark Streaming的checkpoint机制无疑是用起来最简单的,checkpoint数据存储在HDFS中,如果Streaming应用挂掉,可以快速恢复。
但是,如果Streaming程序的代码改变了,重新打包执行就会出现反序列化异常的问题。这是因为checkpoint首次持久化时会将整个jar包序列化,以便重启时恢复。重新打包之后,新旧代码逻辑不同,就会报错或者仍然执行旧版代码。
要解决这个问题,只能将HDFS上的checkpoint文件删掉,但这样也会同时删掉Kafka的offset信息,就毫无意义了。