Flink手动维护offset
引言
对比spark来说一下,flink是如何像spark一样将kafka的offset维护到redis中,从而保证数据的一次仅一次消费,做到数据不丢失、不重复,用过spark的同学都知道,spark在读取kafka数据后,DStream
(准确是InputDStream[ConsumerRecord[String, String]])
中会有这几个信息:topic、partition、offset、key、value、timestamp
等等信息,在维护的时候只需要对DStream进行一次foreach操作就可以了,根据场景选择保存offset的位置,再次重启的时候,读取redis中的offset就可以了。
初次使用flink的同学会发现,flink从env.addSource
获得到是DataStream[String]
,里面的内容直接是value,那么该怎么处理?
步骤
- 重写
FlinkKafkaConsumer010
:组建NewKafkaDStream
- 存储offset到redis
- 读取
代码
import java.nio.charset.StandardCharsets
import java.util._
import com.oneniceapp.bin.KafkaDStream
import my.nexus.util.StringUtils //私仓
import org.apache.flink.api.common.typeinfo.{
TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer010, FlinkKafkaConsumerBase}
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis
import scala.collection.JavaConversions._
1、创建NewKafkaDStream
对象
case class KafkaDStream(topic:String, partition:Int, offset:Long, keyMessage:String, message:String){
}
2、组建kafka信息到NewKafkaDStream
/**
* 组建kafka信息
* @param topic
* @param groupid
* @return
*/
def createKafkaSource(topic:java.util.List[String], groupid:String): FlinkKafkaConsumer010[KafkaDStream] ={
// kafka消费者配置
val dataStream = new FlinkKafkaConsumer010[KafkaDStream](topic:java.util.List[String], new KeyedDeserializationSchema[KafkaDStream]() {
override def getProducedType: TypeInformation[KafkaDStream] = TypeInformation.of(new TypeHint[KafkaDStream]() {
})
override def deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, partition: Int, offset: Long): KafkaDStream = {
val kafkasource = new KafkaDStream(topic, partition, offset, new String(messageKey, StandardCharsets.UTF_8), new String(message, StandardCharsets.UTF_8))
kafkasource
}
override def isEndOfStream(s: KafkaDStream) = false
}, getKafkaProperties(groupid))
//是否自动提交offset
dataStream.setCommitOffsetsOnCheckpoints(true)
dataStream
}
/**
* kafka配置
* @param groupId
* @return
*/
private def getKafkaProperties(groupId:String): Properties = {
val kafkaProps: Properties = new Properties()
kafkaProps.setProperty("bootstrap.servers", "kafka.brokersxxxxxxx")
kafkaProps.setProperty("group.id", groupId)
kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaProps
}
/**
* 从redis中获取kafka的offset
* @param topic
* @param groupId
* @return
*/
def getSpecificOffsets(topic:java.util.ArrayList[String]): java.util.Map[KafkaTopicPartition, java.lang.Long] ={
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
val specificStartOffsets: java.util.Map[KafkaTopicPartition, java.lang.Long] = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
for(topic <- topic){
val jedis = new Jedis(redis_host, redis_port)
val key = s"my_flink_$topic"
val partitions = jedis.hgetAll(key).toList
for(partition <- partitions){
if(!StringUtils.isEmpty(topic) && !StringUtils.isEmpty(partition._1) && !StringUtils.isEmpty(partition._2)){
Logger.warn("topic:"+topic.trim, partition._1.trim.toInt, partition._2.trim.toLong)
specificStartOffsets.put(new KafkaTopicPartition(topic.trim, partition._1.trim.toInt), partition._2.trim.toLong)
}
}
jedis.close()
}
specificStartOffsets
}
3、正文获取kafka的数据
val topics = new java.util.ArrayList[String]
topics.add(myTopic)
val consumer = createKafkaSource(topics, groupId)
consumer.setStartFromSpecificOffset(getSpecificOffsets(topics))
val dataStream = env.addSource(consumer)
4、保存offset,一般写在自定义sink的invoke里,保证处理完了再存储offset
def setOffset(topic:String, partition:Int, offset:Long): Unit ={
val jedis = new Jedis(GetPropKey.redis_host, GetPropKey.redis_port)
val gtKey = s"my_flink_$topic"
jedis.hset(gtKey, partition.toString, offset.toString)
jedis.close()
}
其他
在使用flink的时候发现一个比较有意思的事情,和spark一样,如果不额外进行数据分区,保证原有的并行度,kafka的分区是固定的,不用担心统一partition的乱序的问题。