1.功能实现
提供了一种sparkstreaming接收kafka消息的元数据恢复的功能,方便解决一些应用宕机后的重复消费问题,即宕机后重启,sparkstreaming可以从之前消费位置开始消费,而不是从头开始消费。
2.代码
package _0809kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
*
* 通过spark的checkpoint的机制来记录消费到哪里的数据
*
*/
object DirectKafkaStreamingHA08_1020 {
def main(args: Array[String]) {
//构造上下文
val conf = new SparkConf()
.setAppName("DirectKafkaStreamingHA")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
// val sc = SparkUtil.createSparkContext(true,"DirectKafkaStreamingHA")
/**
* def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
ACTIVATION_LOCK.synchronized {
getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) }
}
}
*/
// val checkpointPath = s"file:///E:\\workspace\\SparkPro\\checkpoint\\streaming_03"
//这个路径一定不能存在
val checkpointPath = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_05"
//
def creatingFunc():StreamingContext ={
val ssc = new StreamingContext(sc,Seconds(10))
//设置ssc的checkpoint存放的路径
ssc.checkpoint(checkpointPath)
val kafkaParams: Map[String, String] = Map[String,String](
"metadata.broker.list" -> "bigdata.ibeifeng.com:9092,bigdata.ibeifeng.com:9093",
"auto.offset.reset" -> "smallest"
)
//fromOffsets 有几个分区就要往map中传入几个对象
//这个参数的意思是,从哪个偏移量开始消费哪一个topic的哪个分区的数据
//方式二:
//【自己新建了beifeng2,5个分区】
val fromOffsets = Map[TopicAndPartition,Long](
TopicAndPartition("beifeng2",0) -> 0l,
TopicAndPartition("beifeng2",1) -> 0l,
TopicAndPartition("beifeng2",2) -> 0l,
TopicAndPartition("beifeng2",3) -> 0l,
TopicAndPartition("beifeng2",4) -> 0l
)
val messageHandler: (MessageAndMetadata[String, String]) => (Long,String) = (mmd: MessageAndMetadata[String, String]) =>
(mmd.offset,mmd.message)
//=====方式二结束==========
val kafkaDirectDStream: InputDStream[(Long,String)] = KafkaUtils.createDirectStream[String,String,
StringDecoder,StringDecoder, (Long,String)](ssc,kafkaParams,fromOffsets,messageHandler)
//=====================================================================
//使用mapPartitions这样的api,在mapPartitions这个api中来定义数据的转换
//来避免DAG图的改变
val resultDStream: DStream[(String, Int)] = kafkaDirectDStream
.mapPartitions(iter =>{
iter.flatMap(t2 =>{
t2._2.split(" ")
}).map(t =>{
(t,1)
})
.map(t =>{
t.swap
}).map(t =>{
t.swap
})
}).reduceByKey(_ + _)
resultDStream.print()
ssc
}
//当要使用HA的机制的时候,需要启用checkpoint,让spark自动在某个文件系统上
//记录程序的相关运行情况,然后每次重启任务的时候,都从相同的位置去读取checkpoint的信息
//来开启ssc
val ssc = StreamingContext.getActiveOrCreate(checkpointPath,creatingFunc)
/**
* 当第一次运行代码,处理逻辑的时候,会生成DAG图,
* DAG图也被保存在checkpoint当中,所以当你第一次
* 运行的时候发现代码逻辑有问题,手动关闭程序,修改
* 代码重新运行,往往会报错(因为DAG图被修改)
* 解决方案:
* 1/自己管理偏移量
* 2/在写api的时候,用一些外层的api来封装内部的可能会变化的操作
*
*/
ssc.start()
ssc.awaitTermination()
}
}
3.方法缺陷
描述:在HA的streaming应用恢复过程中,不能更改RDD执行的DAG图
原因:DAG图改变了,你可以改变部分操作,例如map(_,2),但是不能有架构的改变
解决方法:
(1)map(_,2)做这种之类的改变,没有改变DAG图的操作
(2)停掉spark程序,删除hdfs上的checkpoint的文件夹,然后让他重新生成
(3)在一开始写代码的时候使用mappartitions来封装你的逻辑,这样DAG图只会记录你的mappartitions,不会记录你具体做了什么操作,即修改代码逻辑:
这样的逻辑代码:
val resultDStream = dstream.flatMap(_.split(" ")
.map((_,1))).reduceByKey(_+_)
可以替换成(当发现有bug的时候):
val resultDStream = dstream.mapPartitions(iter =>{
iter.map(key => {
(key,1)})
}).reduceByKey(_+_)