1. 环境准备
- pom依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
- Spark整合Kafka官网案例
官网链接: http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
官网Scala语言整合Kafka案例:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
2. SparkStreaming整合Kafka的wordcount案例
- 这里设置了自动提交offset到默认主题,能够满足大多数场景
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{
DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{
ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{
Seconds, StreamingContext}
import org.apache.spark.{
SparkConf, SparkContext}
/**
* @author liu a fu
* @date 2021/1/24 0024
* @version 1.0
* @DESC Kafka整合
*/
object _01SourceStreamingKafka {
def main(args: Array[String]): Unit = {
//1-环境准备
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[8]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5)) // 每5秒实时消 费数据
//这里不设置也可以,但是程序会找一个默认的路径提交,作为不知道提交到哪里,这里可以显示指定 checkpoint看不懂的校验文件
ssc.checkpoint("data/checkpoint/check03")
val kafkaParams = Map[String, Object](
//broker地址 连接Kafka集群
"bootstrap.servers" -> "node1:9092",
//Kafka的message的格式中关键就是value,因为一般情况下key为null,对key和value序列化和反序列化
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//消费者组名称,这里自己定义即可
"group.id" -> "SparkOnKafka",
//earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费
//latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
//none:表示如果有offset记录从offset记录开始消费,如果没有就报错
//这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//是否为自动提交-----false
//如果为true,这里kafka的offset的偏移量是根据kafka的默认的topic中保存,一般情况下true的方式能够满足绝大多数场景
//如果为false,这里的kafka的offset默认保存在checkpoint中,还可以指定mysql或redis中
"auto.commit.interval.ms"->"1000",//自动提交的时间间隔
"enable.auto.commit" -> (true: java.lang.Boolean)
)
//3-通过KafkaUtils.createDriectStream()创建对接的方法
val valueDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("test"), kafkaParams) //设置topic的name
)
//4-通过map算子获取消息Message里面value的值
val rddDS1: DStream[String] = valueDS.map(_.value())
//5-flatMap map reduceByKey实现累加
val resultDS: DStream[(String, Int)] = rddDS1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
//6-print输出
resultDS.print()
//7-开启StreamingContext()
ssc.start()
//8-StreamingContext() 停止条件
ssc.awaitTermination()
//9-关闭资源
ssc.stop()
}
}
关于KafkaUtils.createDirectStream工具类源码解释:
3. SparkStreaming整合Kafka获取偏移量
import java.lang
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{
DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{
CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.{
SparkConf, SparkContext}
import org.apache.spark.streaming.{
Seconds, StreamingContext}
/**
* @author liu a fu
* @date 2021/1/24 0024
* @version 1.0
* @DESC:
* 1-准备StremingContext环境
* 3-通过KafkaUtils.createDriectStream()创建对接的方法
* 4-使用flatMap
* 5-使用map
* 6-使用reduceByKey算子实现累加
* 7-print
* 8-ssc.start
* 9-ssc.awaitTermination
* 10-ssc.stop
*/
object _03OffsetStreamingKafka {
def main(args: Array[String]): Unit = {
//1-准备StreamingContext的环境
val ssc: StreamingContext ={
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[5]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
//这里不设置也可以,但是程序会找一个默认的路径提交,作为不知道提交到哪里,这里可以显示指定
ssc.checkpoint("data/checkpoint/check003")
ssc //返回值
}
/**
* 调用计算方法
*/
compute(ssc)
ssc.start()
ssc.awaitTermination()
ssc.stop(true,true) //优雅的停止
compute(ssc)
}
/**
* 优化代码抽取的Kafka计算代码方法
* @param ssc
*/
def compute(ssc: StreamingContext): Unit = {
val kafkaParams = Map[String, Object](
//broker地址 连接Kafka集群
"bootstrap.servers" -> "node1:9092",
//Kafka的message的格式中关键就是value,因为一般情况下key为null,对key和value序列化和反序列化
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//消费者组名称,这里自己定义即可
"group.id" -> "SparkOnKafka",
//earliest:表示如果有offset记录从offset记录开始消费,如果没有从最早的消息开始消费
//latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
//none:表示如果有offset记录从offset记录开始消费,如果没有就报错
//这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//是否为自动提交-----false
//如果为true,这里kafka的offset的偏移量是根据kafka的默认的topic中保存,一般情况下true的方式能够满足绝大多数场景
//如果为false,这里的kafka的offset默认保存在checkpoint中,还可以指定mysql或redis中
"enable.auto.commit" -> (false: lang.Boolean)
)
//3-通过KafkaUtils.createDriectStream()创建对接的方法
val valueDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("test"), kafkaParams) //设置topic的name
)
//获取OffsetRange并提交
//f就是来自于kafka的每一个message
valueDS.foreachRDD(f => {
if (f.count() > 0) {
//大于0说明有消息
println("有数据从Kafka消费")
//如何获取偏移量
val offsetRanges: Array[OffsetRange] = f.asInstanceOf[HasOffsetRanges].offsetRanges
//现在就可以执行提交offset到默认的checkpoint中
valueDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} // end if
}) //end foreach
val rddDS1: DStream[String] = valueDS.map(_.value())
val resultDS: DStream[(String, Int)] = rddDS1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
resultDS.print()
}
}
4. 关于Kafka的补充:
关于Kafka的offset的解释:
- 在Kafka中消费者offset是一个非常关键的机制,他可以让消费者消费过程中挂了重新分配Partation,使得下次重新恢复消费时迅速知道从哪里开始消费,(类似于书签)
- Kafka对于offset的处理有两种提交方式:
- 自动提交(默认的提交方式 可以满足大多数场景)
2)手动提交(可以灵活地控制offset)
Kafka的Message的格式:
"auto.offset.reset" -> “latest”: Kafka消费位置:
per-partition assignment:对每个分区都指定一个offset,然后从offset位置开始消费