版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
/**
* Author Vincer
* Date 2019/09/26 10:10
* Language Scala
*/
object StreamingDS {
def main(args: Array[String]): Unit= {
// 创建sparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamingDS")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建Schema
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
// 采集数据
val peopleDF: DataFrame = spark.readStream
.schema(peopleSchema)
.json("E:\\tmp")
// 创建临时表
peopleDF.createOrReplaceTempView("user")
// 查询数据
spark.sql("select * from user where age > 20")
// 数据输出
peopleDF.writeStream
.outputMode("append") //append模式输出
.format("console") //打印到控制台
.start()
.awaitTermination()
}
}