简介:Apache Kafka 是目前最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景,Kafka基本是标配。Structured Streaming很好的集成Kafka,可以从Kafka拉取消息,然后就可以把流数据看做一个DataFrame, 一张无限增长的大表,在这个大表上做查询,Structured Streaming保证了端到端的 exactly-once,用户只需要关心业务即可,不用费心去关心底层是怎么做的
1. Kafka特定配置
官网链接: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
官网截图:
2. KafkaSoure
官方提供三种方式从Kafka topic中消费数据,主要区别在于每次消费Topic名称指定:
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*") //通配符表示
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
从Kafka 获取数据后Schema字段信息如下,既包含数据信息有包含元数据信息:
3. KafkaSink
往Kafka里面写数据类似读取数据,可以在DataFrame上调用writeStream来写入Kafka,设置参数指定value,其中key是可选的,如果不指定就是null。
完成代码:
import org.apache.spark.sql.{
DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.streaming.{
OutputMode, StreamingQuery, Trigger}
/**
* @author liu a fu
* @date 2021/1/26 0026
* @version 1.0
* @DESC 整合Kafka实现wordCount
*/
object _01WordCountKafka {
def main(args: Array[String]): Unit = {
//1-准备环境
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
//注意如果默认情况下SparkSQL发生shuffle的分区个数为200个,这里设置分区个数小一些
.config("spark.sql.shuffle.partitions", 2)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
//2-从Kafka读取数据源(生产数据)
val kafkaDF: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "test")
.load()
//kafkaDF.printSchema()
//root
// |-- key: binary (nullable = true)
// |-- value: binary (nullable = true)
// |-- topic: string (nullable = true)
// |-- partition: integer (nullable = true)
// |-- offset: long (nullable = true)
// |-- timestamp: timestamp (nullable = true)
// |-- timestampType: integer (nullable = true)
//3- 写入数据(消费者消费数据)
//官网建议我们如果想获取值的化需要使用cast转换,这里直接转换value
val resultDS: Dataset[Row] = kafkaDF.selectExpr("cast (value as string)")
.as[String]
//过滤数据
.filter(line => line !=null && line.trim.length > 0)
.flatMap(_.split("\\s+"))
.groupBy($"value")
.count()
//设置Streaming应用输出及启动
val query: StreamingQuery = resultDS.writeStream
.format("console") //输出到控制台
.outputMode(OutputMode.Complete()) //完全输出
//设置流处理的时间 默认是0 为了尽可能快的处理
.trigger(Trigger.ProcessingTime(0)) //设置0 尽可能快的处理
.option("numRows", 10)
.option("truncate", false)
.start()
query.awaitTermination() //查询器等待流式应用终止
query.stop() //等待所有任务运行完成才停止运行
}
}