1 package com.spark_sql
2
3 import java.util.Properties
4 import org.apache.spark.sql.{DataFrame, SparkSession}
5
6 object DataFromMysql {
7 def main(args: Array[String]): Unit = {
8 //todo:1、创建sparkSession对象
9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码
11 val properties: Properties = new Properties()
12 properties.setProperty("user", "root")
13 properties.setProperty("password", "123")
14 //todo:3、读取mysql中的数据
15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties)
16 //todo:4、显示mysql中表的数据
17 mysqlDF.show()
18 spark.stop()
19 }
20
21 }
View Code
1 package com.spark_sql
2
3 import java.util.Properties
4 import org.apache.spark.sql.{DataFrame, SparkSession}
5
6 object DataFromMysql {
7 def main(args: Array[String]): Unit = {
8 //todo:1、创建sparkSession对象
9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码
11 val properties: Properties = new Properties()
12 properties.setProperty("user", "root")
13 properties.setProperty("password", "123")
14 //todo:3、读取mysql中的数据
15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties)
16 //todo:4、显示mysql中表的数据
17 mysqlDF.show()
18 spark.stop()
19 }
20
21 }
1 package com.spark_sql
2
3 import org.apache.spark.SparkContext
4 import org.apache.spark.rdd.RDD
5 import org.apache.spark.sql.{DataFrame, SparkSession}
6
7 object InferringSchema {
8 def main(args: Array[String]): Unit = {
9 //todo:1、构建sparkSession 指定appName和master的地址
10 val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()
11 //todo:2、从sparkSession获取sparkContext对象
12 val sc: SparkContext = spark.sparkContext
13 sc.setLogLevel("WARN") //设置日志输出级别
14 //todo:3、加载数据
15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
16 //todo:4、切分每一行记录
17 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
18 //todo:5、将RDD与Person类关联
19 val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
20 //todo:6、创建dataFrame,需要导入隐式转换
21 import spark.implicits._
22 val personDF: DataFrame = personRDD.toDF()
23
24 //todo-------------------DSL语法操作 start--------------
25 //1、显示DataFrame的数据,默认显示20行
26 personDF.show()
27 //2、显示DataFrame的schema信息
28 personDF.printSchema()
29 //3、显示DataFrame记录数
30 println(personDF.count())
31 //4、显示DataFrame的所有字段
32 personDF.columns.foreach(println)
33 //5、取出DataFrame的第一行记录
34 println(personDF.head())
35 //6、显示DataFrame中name字段的所有值
36 personDF.select("name").show()
37 //7、过滤出DataFrame中年龄大于30的记录
38 personDF.filter($"age" > 30).show()
39 //8、统计DataFrame中年龄大于30的人数
40 println(personDF.filter($"age" > 30).count())
41 //9、统计DataFrame中按照年龄进行分组,求每个组的人数
42 personDF.groupBy("age").count().show()
43 //todo-------------------DSL语法操作 end-------------
44
45 //todo--------------------SQL操作风格 start-----------
46 //todo:将DataFrame注册成表
47 personDF.createOrReplaceTempView("t_person")
48 //todo:传入sql语句,进行操作
49
50 spark.sql("select * from t_person").show()
51
52 spark.sql("select * from t_person where name='zhangsan'").show()
53
54 spark.sql("select * from t_person order by age desc").show()
55 //todo--------------------SQL操作风格 end-------------
56
57
58 sc.stop()
59 }
60 }
61
62 case class Person (val id:Int,val name: String, val age: Int)
1 package com.spark_sql
2
3 import org.apache.spark.SparkContext
4 import org.apache.spark.rdd.RDD
5 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
6 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
7
8 object SparkSqlSchema {
9 def main(args: Array[String]): Unit = {
10 //todo:1、创建SparkSession,指定appName和master
11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()
12 //todo:2、获取sparkContext对象
13 val sc: SparkContext = spark.sparkContext
14 //todo:3、加载数据
15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
16 //todo:4、切分每一行
17 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
18 //todo:5、加载数据到Row对象中
19 val personRDD: RDD[Row] = dataArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
20 //todo:6、创建schema
21 val schema: StructType = StructType(Seq(
22 StructField("id", IntegerType, false),
23 StructField("name", StringType, false),
24 StructField("age", IntegerType, false)
25 ))
26
27 //todo:7、利用personRDD与schema创建DataFrame
28 val personDF: DataFrame = spark.createDataFrame(personRDD, schema)
29
30 //todo:8、DSL操作显示DataFrame的数据结果
31 personDF.show()
32
33 //todo:9、将DataFrame注册成表
34 personDF.createOrReplaceTempView("t_person")
35
36 //todo:10、sql语句操作
37 spark.sql("select * from t_person").show()
38
39 spark.sql("select count(*) from t_person").show()
40
41
42 sc.stop()
43 }
44 }
1 package com.spark_sql
2
3 import java.util.Properties
4 import org.apache.spark.rdd.RDD
5 import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
6
7 object SparkSqlToMysql {
8 def main(args: Array[String]): Unit = {
9 //val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
10 //todo:1、创建sparkSession对象
11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local[2]").getOrCreate()
12 //todo:2、读取数据
13 val data: RDD[String] = spark.sparkContext.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")
14 //todo:3、切分每一行,
15 val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
16 //todo:4、RDD关联Student
17 val studentRDD: RDD[student01] = arrRDD.map(x => student01(x(0).toInt, x(1), x(2).toInt))
18 //todo:导入隐式转换
19 import spark.implicits._
20 //todo:5、将RDD转换成DataFrame
21 val studentDF: DataFrame = studentRDD.toDF()
22 //todo:6、将DataFrame注册成表
23 studentDF.createOrReplaceTempView("student")
24 //todo:7、操作student表 ,按照年龄进行降序排列
25 val resultDF: DataFrame = spark.sql("select * from student order by age desc")
26
27 //todo:8、把结果保存在mysql表中
28 //todo:创建Properties对象,配置连接mysql的用户名和密码
29 val prop = new Properties()
30 prop.setProperty("user", "root")
31 prop.setProperty("password", "123")
32
33 resultDF.write.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student01", prop)
34
35 //todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
36 //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
37 spark.stop()
38 }
39 }
40
41 //todo:创建样例类Student
42 case class student01(id: Int, name: String, age: Int)
1 package com.SparkStreaming_Flume_Poll
2
3 import java.net.InetSocketAddress
4 import org.apache.spark.storage.StorageLevel
5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
7 import org.apache.spark.streaming.{Seconds, StreamingContext}
8 import org.apache.spark.{SparkConf, SparkContext}
9
10 object SparkStreaming_Flume_Poll {
11
12 //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
13 //runningCount 历史的所有相同key的value总和
14 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
15 val newCount = runningCount.getOrElse(0) + newValues.sum
16 Some(newCount)
17 }
18
19
20 def main(args: Array[String]): Unit = {
21 //配置sparkConf参数
22 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
23 //构建sparkContext对象
24 val sc: SparkContext = new SparkContext(sparkConf)
25 //设置日志级别
26 sc.setLogLevel("WARN")
27 //构建StreamingContext对象,每个批处理的时间间隔
28 val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
29 //设置checkpoint
30 scc.checkpoint("./")
31 //设置flume的地址,可以设置多台
32 val address = Seq(new InetSocketAddress("192.168.107.144", 8888))
33 // 从flume中拉取数据
34 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK)
35
36 //获取flume中数据,数据存在event的body中,转化为String
37 val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()))
38 //实现单词汇总
39 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction)
40
41 result.print()
42 scc.start()
43 scc.awaitTermination()
44 }
45
46
47 }