Spark 练习之 Spark SQL 与数据源
练习内容:
- 使用Spark SQL加载JSON文件并展示数据
- 使用Spark SQL加载Hive表数据并展示
- 使用Spark SQL JDBC数据源连接到MySQL数据,加载并保存数据
- 使用Spark SQL加载并保存Avro数据
本文内容参考于: Spark SQL 官网 和 本人博客:Spark SQL 之数据源
读取 JSON 文件
person.json 文件内容
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
项目目录结构
pom依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.4.4</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
ReadJson.scala 文件内容
package org.feng.datasource
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Created by Feng on 2020/2/10 15:30
* CurrentProject's name is spark_demo
*/
class ReadJson(spark: SparkSession) {
/**
* 读取 json 文件
* @param path json 文件的路径
* @param tempView 临时视图的名字
*/
def readJson(path:String, tempView:String)={
val dataFrame = spark.read.json(path)
dataFrame.createOrReplaceTempView(tempView)
spark.sql("select * from " .concat(tempView)).show()
}
}
Client.scala 文件内容
package org.feng.client
import org.apache.spark.sql.SparkSession
import org.feng.datasource.ReadJson
/**
* Created by Feng on 2020/2/10 15:32
* CurrentProject's name is spark_demo
*/
object Client extends App{
val spark = SparkSession
.builder()
.appName("Client_".concat(System.currentTimeMillis().toString))
.master("local")
.getOrCreate()
val jsonPath = "D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\spark_datasource\\src\\main\\resources\\people.json"
val readJson = new ReadJson(spark)
readJson.readJson(jsonPath, "people")
}
运行结果
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
读取 Hive 表内容
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object SparkHiveText {
def main(args: Array[String]) {
val conf =new SparkConf().setMaster("local").setAppName("SparkHiveText")
val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
hc.sql("select * from 表名").show()
sc.stop()
}
}
读、写 Mysql
读 Mysql
ReadMysql.scala 文件内容
package org.feng.datasource
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
* 读取 mysql 中的数据
* Created by Feng on 2020/2/10 15:54
* CurrentProject's name is spark_demo
*/
class ReadMysql(spark: SparkSession, database:String){
val user = "root"
val password = "root"
val url = "jdbc:mysql://localhost:3306/"
.concat(database)
.concat("?serverTimezone=UTC")
// 读取数据
def readMysql(tableName:String): Unit ={
// 加载驱动
new com.mysql.cj.jdbc.Driver
spark.read
.jdbc(url, tableName, properties(user, password))
.show(2)
}
def properties(user:String, password:String):Properties={
val pro = new Properties()
pro.put("user", user)
pro.put("password", password)
pro
}
}
Client.scala 文件内容
package org.feng.client
import org.apache.spark.sql.SparkSession
import org.feng.datasource.{ReadJson, ReadMysql}
/**
* Created by Feng on 2020/2/10 15:32
* CurrentProject's name is spark_demo
*/
object Client extends App{
val spark = SparkSession
.builder()
.appName("Client_".concat(System.currentTimeMillis().toString))
.master("local")
.getOrCreate()
// 运行
runReadMysql()
// 读取 mysql
def runReadMysql(): Unit ={
// 指定数据库
val readMysql = new ReadMysql(spark, "demo2019")
// 指定表名
readMysql.readMysql("user")
}
// 读取 json
def runReadJson(): Unit ={
val jsonPath = "D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\spark_datasource\\src\\main\\resources\\people.json"
val readJson = new ReadJson(spark)
readJson.readJson(jsonPath, "people")
}
}
运行结果
+---+------+--------+-------------+
| id| name|password| create_date|
+---+------+--------+-------------+
| 1| feng| 12345|1575853408359|
| 2|liqian| 123456|1575853408359|
+---+------+--------+-------------+
写入 Mysql
读取一个 json
文件,转换为 DataFrame
,然后追加到 Mysql。
user.json 文件内容
{"id": 3, "name":"小冯", "password": "65535", "create_date":1575853408359}
WriteMysql.scala 文件内容
package org.feng.datasource
import java.util.Properties
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Created by Feng on 2020/2/10 16:06
* CurrentProject's name is spark_demo
*/
class WriteMysql(spark:SparkSession, database:String) {
val user = "root"
val password = "root"
val url = "jdbc:mysql://localhost:3306/"
.concat(database)
.concat("?serverTimezone=UTC")
def writeMysql(tableName:String): Unit ={
// 加载驱动
new com.mysql.cj.jdbc.Driver
val dataFrame = spark.read
.json("D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\spark_datasource\\src\\main\\resources\\user.json")
// 写入数据:追加
dataFrame.write.mode(SaveMode.Append)
.jdbc(url, tableName, properties(user, password))
}
def properties(user:String, password:String):Properties={
val pro = new Properties()
pro.put("user", user)
pro.put("password", password)
pro
}
}
Client.scala 文件内容
package org.feng.client
import org.apache.spark.sql.SparkSession
import org.feng.datasource.{ReadJson, ReadMysql, WriteMysql}
/**
* Created by Feng on 2020/2/10 15:32
* CurrentProject's name is spark_demo
*/
object Client extends App{
val spark = SparkSession
.builder()
.appName("Client_".concat(System.currentTimeMillis().toString))
.master("local")
.getOrCreate()
// runReadJson()
runWriteMysql()
runReadMysql()
def runWriteMysql()={
val writeMysql = new WriteMysql(spark, "demo2019")
writeMysql.writeMysql("user")
}
def runReadMysql(): Unit ={
val readMysql = new ReadMysql(spark, "demo2019")
readMysql.readMysql("user")
}
def runReadJson(): Unit ={
val jsonPath = "D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\spark_datasource\\src\\main\\resources\\people.json"
val readJson = new ReadJson(spark)
readJson.readJson(jsonPath, "people")
}
}
运行结果
插入数据后,读取内容!
+---+------+--------+-------------+
| id| name|password| create_date|
+---+------+--------+-------------+
| 1| feng| 12345|1575853408359|
| 2|liqian| 123456|1575853408359|
| 3| 小冯| 65535|1575853408359|
+---+------+--------+-------------+
读、写 Avro 数据
准备工作
我的 scala 版本是 2.11,spark 版本是 2.4.4。
在 windows 下准备一个 Spark ,解压,并在bin
目录下执行以下命令
spark-shell.cmd --packages org.apache.spark:spark-avro_2.11:2.4.4 ...
F:\big-data\spark\spark-2.4.4-bin-hadoop2.7\spark-2.4.4-bin-hadoop2.7\bin>spark-shell.cmd --packages org.apache.spark:spark-avro_2.11:2.4.4 ...
Ivy Default Cache set to: C:\Users\Feng\.ivy2\cache
The jars for the packages stored in: C:\Users\Feng\.ivy2\jars
:: loading settings :: url = jar:file:/F:/big-data/spark/spark-2.4.4-bin-hadoop2.7/spark-2.4.4-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.x
org.apache.spark#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c6db07dd-d02d-43b6-847e-202d438b495c;1.0
confs: [default]
found org.apache.spark#spark-avro_2.11;2.4.4 in central
found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.4/spark-avro_2.11-2.4.4.jar ...
[SUCCESSFUL ] org.apache.spark#spark-avro_2.11;2.4.4!spark-avro_2.11.jar (132934ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (1625ms)
:: resolution report :: resolve 375353ms :: artifacts dl 134568ms
:: modules in use:
org.apache.spark#spark-avro_2.11;2.4.4 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 2 | 2 | 0 || 2 | 2 |
---------------------------------------------------------------------
:: problems summary ::
:::: ERRORS
unknown resolver null
unknown resolver null
unknown resolver null
unknown resolver null
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
:: retrieving :: org.apache.spark#spark-submit-parent-c6db07dd-d02d-43b6-847e-202d438b495c
confs: [default]
2 artifacts copied, 0 already retrieved (185kB/18ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.26.100:4040
Spark context available as 'sc' (master = local[*], app id = local-1581331065457).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
读取
其中,users.avro
文件是解压了 spark 后的 example 文件夹中的一个资源文件。
scala> spark.read.format("avro").load("D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\spark_datasource\\src\\main\\resources\\users.avro").show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
写入
读取 users.avro
中的内容,并写入到 2020avro.avro
文件中。
scala> spark.read.format("avro").load("D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\spark_datasource\\src\\main\\resources\\users.avro").
| write.mode(SaveMode.Append).format("avro").save("D:\\2020avro.avro")
读取新的文件内容:
scala> spark.read.format("avro").load("D:\\2020avro.avro").show()
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+