SparkSQL实现wordCount与资源转换

Spark SQL完成WrodCount

//导包
import org.apache.spark.sql.SparkSession
//编写代码
//1.定义SparkSession
    val spark = SparkSession.builder().master("local[*]").appName("wordCount").getOrCreate()
//2.根据SparkSession获取SparkContext 上下文对象
    val sc = spark.sparkContext
//3.根据sparkContext读取文件并按照空格 返回RDD
    val wordRDD = sc.textFile("./data/words.txt").flatMap(_.split(" "))
//4.导入隐式转换
    import spark.implicits._
    val dataFrame = wordRDD.toDF()
//5.wordRDD 进行计算WordContext
    dataFrame.createOrReplaceTempView("word")
    //=============SQL============
//6.编写SQL
    var sql="select value,count(value) as count from word group by  value order by count desc"
//7.提交SQL语句并查询
    spark.sql(sql).show()
    //============SDL============
    dataFrame.groupBy($"value").count().orderBy($"count" desc).show()

Spark SQL多数据源交互

Spark SQL 可以与多种数据源交互,如普通文本、json、Parquet、csv等

写入数据
//导包
import org.apache.spark.sql.{SaveMode, SparkSession}
// 定义样例类用来快速保存数据
  case class Person(id: Int, name: String, age: Int)
  def main(args: Array[String]): Unit = {
    //1.实例SparkSession
    val spark = SparkSession.builder().master("local[*]").appName("sql").getOrCreate()
    //2.根据SparkSession 获取SparkContext 获取上下文对象
    val sc = spark.sparkContext
    //3.读取数据并按照空格切分保存到person中返回RDD
    val personRDD = sc.textFile("./data/person.txt").map(_.split(" ")).map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //4.导入隐式类
    import spark.implicits._
    //5.RDD 转换为DataFrame
    val personDF = personRDD.toDF()
    //6.将数据转换成json输出
    personDF.write.json("./data/json")
    //7.将数据转换成cav
    personDF.write.csv("./data/csv")
    //8.将数据转成 parquet
    personDF.write.parquet("./data/parquet")
    //9.将数据写入到mysql中
    val prop = new Properties()
    // 添加用户名
    prop.setProperty("user","root")
    // 添加密码
    prop.setProperty("password","root")
    // 写入数据并连接
   personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8","person",prop)
读取数据
//导包
import org.apache.spark.sql.SparkSession
 //1.实例SparkSession
    val spark = SparkSession.builder().master("local[*]").appName("sql").getOrCreate()
    //2.根据SparkSession获取SparkContext 获取上下文对象
    val sc = spark.sparkContext
    //3.读取json数据
    spark.read.json("./data/json").show()
    //4.读取csv文件 添加元数据信息
    spark.read.csv("./data/csv").toDF("id", "name", "age").show()
    //5.读取parquet数据
    spark.read.parquet("./data/parquet").show()
    //6.读取mysql数据
    val prop = new Properties()
    // 添加用户名
    prop.setProperty("user", "root")
    // 添加密码
    prop.setProperty("password", "root")
    // 添加连接配置并获取数据
    spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata0407?characterEncoding=UTF-8","person",prop).show()
发布了88 篇原创文章 · 获赞 99 · 访问量 21万+

猜你喜欢

转载自blog.csdn.net/qq_43791724/article/details/105468157