使用IDEA编写StructuredStreaming_json

读取本地文件:

在people.json文件输入如下数据:
{“name”:“json”,“age”:23,“hobby”:“running”}
{“name”:“charles”,“age”:32,“hobby”:“basketball”}
{“name”:“tom”,“age”:28,“hobby”:“football”}
{“name”:“lili”,“age”:24,“hobby”:“running”}
{“name”:“bob”,“age”:20,“hobby”:“swimming”}

需求:
使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜
代码演示 :

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object WordCount2 {
//统计年龄小于25岁的人群的爱好排行榜
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark: SparkSession =
SparkSession.builder().master(“local[*]”).appName(“WordCount2”).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//准备数据结构
val Schema: StructType = new StructType()
.add(“name”, “string”)
.add(“age”, “integer”)
.add(“hobby”, “string”)
//接受数据
import spark.implicits._
val dataDF: DataFrame =
spark.readStream.schema(Schema).json(“F:\第四学期的大数据资料\day02四月份资料\第二周\day04”)
//根据业务处理和计算数据
val result: Dataset[Row] =
dataDF.filter( " a g e " < 25 ) . g r o u p B y ( " h o b b y " ) . c o u n t ( ) . s o r t ( "age" < 25).groupBy("hobby").count().sort( “count”.desc)
//输出数据
result.writeStream
.format(“console”)
.outputMode(“complete”)
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}

发布了238 篇原创文章 · 获赞 429 · 访问量 25万+

猜你喜欢

转载自blog.csdn.net/qq_45765882/article/details/105564092