版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_37050372/article/details/82949928
我们可以将我们之前写的wordcount的结果写成各种格式:
csv格式:
代码如下:
package com.test.SparkSQL
import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, types}
import org.apache.spark.{SparkConf, SparkContext}
case class Person(id:Long,name:String,age:Int,fv:Int)
object OldSparkSQL {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("OldSparkSQL").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("hdfs://marshal:9000/person.txt")
//val PersonRDD = lines.map(_.split(",")).map(arr => Person(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
val rowRDD = lines.map(line =>{
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val age = fields(2).toInt
val fv = fields(3).toInt
// Person(id,name,age,fv)
Row(id,name,age,fv)
})
val sqlContext = new SQLContext(sc)
//导入隐式转换才能变为DataFrame
import sqlContext.implicits._
val schema = StructType(
List(
StructField("id",LongType,true),
StructField("name",org.apache.spark.sql.types.StringType,true),
StructField("age",IntegerType,true),
StructField("fv",IntegerType,true)
)
)
val pdf: DataFrame = sqlContext.createDataFrame(rowRDD,schema)
pdf.registerTempTable("t_person")
val result: DataFrame = sqlContext.sql("select name,age,fv from t_person order by fv desc, age asc")
result.write.csv("C:/Users/11489/Desktop/result1")
// result.show()
sc.stop()
}
}
结果如下:
json格式:
代码如下:
package com.test.SparkSQL
import org.apache.spark.sql._
object DataSetWordCount {
def main(args: Array[String]): Unit = {
import org.apache.spark
//创建一个sparkSession
val session: SparkSession = SparkSession.builder()
.appName("DataSetWordCount")
.master("local[*]")
.getOrCreate()
import session.implicits._
val lines: Dataset[String] = session.read.textFile("D:/a/word.txt")
val words: Dataset[String] = lines.flatMap(_.split(" "))
// import org.apache.spark.sql.functions._
// val result: Dataset[Row] = words.groupBy($"value" as "word")
// .agg(count("*") as "counts")
// .sort($"counts" desc)
val grouped: RelationalGroupedDataset = words.groupBy($"value" as "word")
val counted: DataFrame = grouped.count()
val result: Dataset[Row] = counted.sort($"count" desc)
result.write.json("C:/Users/11489/Desktop/result2")
result.show()
}
}
结果如下:
parquet格式:
这个是支持列存储的类型,那么存成这种格式有什么好处呢?
这样以后我们想读哪列就可以读哪列,并且他是基于列进行压缩的。占用空间更小。
package com.test.SparkSQL
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object sqlWordCount {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder()
.appName("sqlWordCount")
.master("local[*]")
.getOrCreate()
//指定读取数据的位置
//val lines: Dataset[String] = session.read.textFile("D:/a/word.txt")
val lines: DataFrame = session.read.format("text").load("D:/a/word.txt")
//导入sparksession中的隐式转换
import session.implicits._
val words: Dataset[String] = lines.flatMap(_.getAs[String]("value").split(" "))
// val df: DataFrame = words.withColumnRenamed("value","word")
// //先创建视图,再执行sql
// df.createTempView("v_wc")
// val result: DataFrame = session.sql("select word,count(*) counts from v_wc group by word order by counts desc")
// result.show()
// //DSL方式
// import org.apache.spark.sql.functions._
// val result: Dataset[Row] = words.groupBy($"value").agg(count("*") as "counts").sort($"counts" desc)
// result.show()
//将DataSet转换成DataFrame,这样就可以写sql了
val df = words.toDF()
df.createTempView("v_wc")
val result: DataFrame = session.sql("select value word,count(*) counts from v_wc group by word order by counts desc")
result.write.parquet("C:/Users/11489/Desktop/result3")
result.show()
}
}
运行结果:
我们可以看一下这个文件名:
从mysql中加载数据:
package com.test.SparkSQL
import org.apache.spark.sql.{DataFrame, SparkSession}
object JdbcDataSource {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("JdbcDataSource")
.master("local[*]")
.getOrCreate()
val logs: DataFrame = spark.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://localhost:3306/lfr",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "users",
"user" -> "root",
"password" -> "928918zbgch521"))
.load()
logs.show()
}
}
运行结果:
将结果写入到mysql:
代码如下:
val prop = new Properties()
prop.put("user","root")
prop.put("password","928918zbgch521")
logs.where(logs.col("id") <= 3)
.write.mode("append")
.jdbc("jdbc:mysql://localhost:3306/lfr","users",prop)
运行结果:
从parquet文件中读取数据:
代码如下:
package com.test.SparkSQL
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadParquet {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("ReadParquet")
.master("local[*]")
.getOrCreate()
val result: DataFrame = spark.read.format("parquet").load("C:/Users/11489/Desktop/result3")
result.show()
}
}
运行结果如下: