package kafka
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark Streaming 与kafka 0.10x版本的集成
*/
object SSCDriectKafka010 {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load()
val conf = new SparkConf().setAppName("kafka-streaming").setMaster("local[*]")
//批处理时间3秒
val ssc = new StreamingContext(conf,Seconds(3))
//设置消费者组id
val groupId = "day_02"
/**
* 构建kafka连接参数
* latest,earliest,none
*
* earliest
* //当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
* latest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
* none
* topic各分区都存在已提交的offset时,从offset后开始消费,只要有一个分区不存在已提交的offset,则抛出异常
*/
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
//"auto.commit.interval.ms"-> "1000",设置为1秒提交一次offset,默认是5秒
"enable.auto.commit" -> (false: java.lang.Boolean) //是否自动递交偏移量
)
//指定主题
val topics = Array("user")
/**
*指定kafka数据源
* locationStrategy位置策略
* 包含了两个参数PreferBrokers,PreferConsistent
* 如果说kafka的broker节点跟spark的executor节点不在同一台机器的话,name就使用PreferConsistent
*
* 那么在企业中多数情况下,kafka的broker和executor是不会在一台服务器的,但是对于多数
* 中小企业来说会部署到一台
* 设定位置策略的原因是,会以最优化的策略进行读取数据
* 如果两者在同一台服务器的话,读写数据性能会非常高,不需要走网络传输
* PreferConsistent,将来kafka拉取的数据会尽量的将数据平均分散到所有的executor节点上
*/
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topics, kafkaParams)
//传入两个参数,一个是主题,一个是配置的参数集
)
//遍历RDD
stream.foreachRDD(rdd=>{
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Array[OffsetRange]里面有几个offsetRange,有几个分区就有几个OffsetRange
for (o <- offsetRanges){
println(s"topic=${o.topic},partittion=${o.partition},fromoffset=${o.fromOffset},endoffset=${o.untilOffset}")
}
//主动发起递交偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
SparkStreaming整合kafka入门
猜你喜欢
转载自blog.csdn.net/LJ2415/article/details/85255176
今日推荐
周排行