版权声明: https://blog.csdn.net/xianpanjia4616/article/details/81842900
最近也是有很多同学问我,StructuredStreaming结合kafka的使用,我简单的写了一个demo,后续会有更加具体和详细的介绍,今天先来一个简单的demo吧.代码测试过了,可以运行.
package spark
import org.apache.spark.sql.{SparkSession}
/**
* structredstreaming
*/
object StructuredStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","")
.option("subscribe","jason_0606")
.option("startingOffsets", "latest")
.load()
import spark.implicits._
val query = df.select($"key", $"value")
.as[(String, String)].map(x => x._1 + " " + x._2).as[String]
.writeStream
.outputMode("append")
.format("console")
.start()
println(query +"哈哈哈")
query.awaitTermination()
}
}
今天就先写到这里,有时间继续更新.
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢