SparkSQL的小学习(方便随时查看)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/shujuelin/article/details/83865948

1、SQLContextApp

package sparkSQLmook

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}


/**
  * spark1.6的SQL使用,本地模式
  */
object SQLContextApp {

 /* def main(args: Array[String]): Unit = {

      //val path = args(0)
      val conf = new SparkConf()
                   .setAppName("SQLContextApp")
                   .set("spark.sql.warehouse.dir","file:///")
                   .setMaster("local[2]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)


      val df = sqlContext.read.format("json").load("C://Users//shujuelin//Desktop//spark//people.json")

      df.show()
      sc.stop()
  }*/

  //生产模式
  def main(args: Array[String]): Unit = {

     val path = args(0)
      val conf = new SparkConf()

      val sc = new SparkContext(conf)

      val sqlContext = new SQLContext(sc)

      val df = sqlContext.read.format("json").load(path)


      df.show()
      sc.stop()
  }
}

2、SparkSessionApp

package sparkSQLmook

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession

object SparkSessionApp {

  def main(args: Array[String]): Unit = {

     val spark = new SparkSession
                       .Builder()
                        .master("local")
                        .appName("SparkSessionApp")
                        .config("spark.sql.warehouse.dir","file:///")
                        .getOrCreate()

    val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")

    df.show()

  }

}

3、ParquetDemo

package sparkSQLmook

import org.apache.spark.sql.SparkSession

object ParquetDemo {

  def main(args: Array[String]): Unit = {

     val spark = SparkSession
                  .builder()
                   .appName("ParquetDemo")
                   .master("local[2]")
                   .config("spark.sql.warehouse.dir","file:///")
                  .getOrCreate()

    /**
      * spark.read.format("parquet").load标准写法
      */
     //val rddDF = spark.read.format("parquet").load("C://Users//shujuelin//Desktop//spark//users.parquet")
     //rddDF.show()

    //sparksql默认的处理format就是parquet
    // rddDF.select("name","favorite_color").write.format("json").save("C://Users//shujuelin//Desktop//spark//userss.json")

    /**
      * 通用型
      */
    val DF = spark.read.load("C://Users//shujuelin//Desktop//spark//users.parquet")
    DF.show(false)
    spark.stop()
  }

}

4、JdbcBeelineSQL

package sparkSQLmook

import java.sql.DriverManager

//通过jdbc方式访问sparkSQL
object JdbcBeelineSQL {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://spark1:10000","root","")
    val pstmt = conn.prepareStatement("select name,age,score from students")
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      println("name:" + rs.getString("name") +
        " , age:" + rs.getInt("age") +
        " , score:" + rs.getDouble("score"))

    }

    rs.close()
    pstmt.close()
    conn.close()


  }
}

5、HiveMySQLApp

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
 * 使用外部数据源综合查询Hive和MySQL的表数据
  *  外部数据源综合案例(重要)     把hive里的表和mysql里的表结合在一起
create database spark;
use spark;
//创建表
CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;

  //插入数据
INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');

  */
object HiveMySQLApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("HiveMySQLApp")
      .master("local[2]").getOrCreate()

    // 加载Hive表数据
    val hiveDF = spark.table("emp")

    // 加载MySQL表数据
    val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

    // JOIN
    val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
    resultDF.show


    resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
      mysqlDF.col("deptno"), mysqlDF.col("dname")).show

    spark.stop()
  }

}

6、HiveAPP

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
  * sparkSQL操作hive表数据   1.读:spark.table(tablename)
  * 2.写:df.write.saveAsTable(tablename)
  */
object HiveAPP {
//采用spark.sql方式 --->在spark-shell里操作
def main(args: Array[String]): Unit = {

  val spark = SparkSession.builder()
    .appName("DataFrameRdd")
    .master("local[2]")
    .config("spark.sql.warehouse.dir","file:///")
    .getOrCreate()

  import spark.implicits._


   //val HiveDf = spark.table("t_movies")
   spark.sql("show databases").show()

   }
}

7、DatasetApp

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
 * Dataset操作  ->读取scv文件
 */
object DatasetApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("DatasetApp")
      .master("local[2]").getOrCreate()

    //注意:需要导入隐式转换
    import spark.implicits._

    val path = "file:///f:/text/sales.csv"

    //spark如何解析csv文件? 头:就是csv中的字段   inferSchema:自动推断schema
    val df = spark.read.option("header","true").option("inferSchema","true").csv(path)
    df.show

    val ds = df.as[Sales]  //DataFrame转换为DataSet
    //map是迭代,每一行只取出id
    ds.map(line => line.itemId).show


    spark.sql("seletc name from person").show

    //df.seletc("name")
    df.select("name")

    ds.map(line => line.itemId)

    spark.stop()
  }

  case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}

8、DataFrameRdd

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
  * 将RDD转换为DataFrame  第一种方式:采用反射的方式
  */

object DataFrameRdd {

  def main(args: Array[String]): Unit = {

      val spark = SparkSession.builder()
                   .appName("DataFrameRdd")
                   .master("local[2]")
                    .config("spark.sql.warehouse.dir","file:///")
                     .getOrCreate()

     //RDD ==> DataFrame

     val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")

     import spark.implicits._

    //把Rdd转换为DataFrame
     val lineDF = rdd.map(_.split(",")).map(line => info(line(0).toInt,line(1),line(2).toInt)).toDF()
    /* val lineRDD = rdd.map(line => line.split(","))
     val lineDF = lineRDD.map(lines => info(lines(0).toInt,lines(1),lines(2).toInt)).toDF()*/
     //lineDF.show()

     //1.基于DataFrame的api编程
     //lineDF.filter($"age">20).show()
    //2.基于sql的api
     lineDF.createOrReplaceTempView("info")
     spark.sql("select name,age from info where age >20").show()

      spark.stop()
  }

  case class info(id : Int, name : String, age : Int)

}

9、DataFrameRdd2

package sparkSQLmook

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * 把RDD转换为DataFrame的第二种方式:编程式Row。当不知道数据的类型时候,采用
  */
object DataFrameRdd2 {

  def main(args: Array[String]): Unit = {

      val spark = SparkSession
                   .builder()
                  .appName("DataFrameRdd2")
                   .master("local[2]")
                   .config("spark.sql.warehouse.dir","file:///")
                  .getOrCreate()

      val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")

      //1.把rdd转为row
      val rddRow = rdd.map(_.split(",")).map(line => Row(line(0).toInt,line(1),line(2).toInt))
      //2.对row创建scheme元数据结构
      val structType = StructType(Array(
        StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)))
      //把row和scheme绑定
      val df = spark.createDataFrame(rddRow,structType)
      //df.show()

    //sql式编程
      df.createOrReplaceTempView("info")//创建临时表

      spark.sql("select * from info where age > 20").show()

    spark.stop()
  }

}

10、DataFrameOperation

package sparkSQLmook

import org.apache.spark.sql
import org.apache.spark.sql.functions._
/**
  * DataFrame的API操作
  */
object DataFrameOperation {

  def main(args: Array[String]): Unit = {

    val spark = new sql.SparkSession
    .Builder()
      .master("local")
      .appName("SparkSessionApp")
      .config("spark.sql.warehouse.dir","file:///")
      .getOrCreate()

    //隐式转换
    import spark.implicits._
    //val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")
     val df = spark.read.format("json").load("C:/Users/shujuelin/Desktop/spark/people.json")

    //df.show()//show()默认20条
    //df.printSchema()
    //df.select("name").show()//select操作,典型的弱类型,untyped操作
   // df.select($"name", $"age" + 1).show()  // 使用表达式,scala的语法,要用$符号作为前缀。对年龄加1
    //df.filter($"age">19).show()
    //另一种写法
    //df.filter(df.col("age")>19).show()
    df.select(df.col("name"),(df.col("age")+3).as("age2")).show() //别名
    //df.groupBy("age").count().show()//先分组在进行聚合
    spark.stop()
  }

}

11、DataFrameCase

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
  * DataFrame的案例(api其他操作)
  */
object DataFrameCase {

  def main(args: Array[String]): Unit = {

      val spark = SparkSession
                  .builder()
                   .appName("DataFrameCase")
                   .master("local[2]")
                   .config("spark.sql.warehouse.dir","file:///")
                   .getOrCreate()

      val rdd = spark.sparkContext.textFile("C://Users//shujuelin//Desktop//spark//student.data")

      import spark.implicits._
     // 分割符|必须要加转义字符\\
    //采用反射的方式转换为dataframe
      val infoDF = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()

      infoDF.show(false)//show 默认显示20条  如果超出可以用 infoDF.show(30,false),false代表不截取
      //infoDF.take(10).foreach(println)
      //infoDF.first()//拿第一条
      //infoDF.head(3) // 拿前三条
      //infoDF.select("name","phone").show()
      //infoDF.show(20,false)

    //过滤名字为空的和NULL的
    //infoDF.filter("name = '' or name = 'NULL'").show()

    //过滤名字以s开头的
    //infoDF.filter("substr(name,0,1) = 's'").show(20,false)

    //排序
    //按照名字排序.默认升序
     //infoDF.sort($"name".desc).show()//或者  infoDF.sort(infoDF.col("name").desc).show()

     //infoDF.sort(infoDF.col("name").asc,infoDF.col("id").desc).show(20,false)

     //改字段名字
      //infoDF.select(infoDF.col("name").as("student_info")).show(20,false)



     //join操作
    /* val infoDF2 = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()

     infoDF.join(infoDF2, infoDF.col("id") === infoDF2.col("id")).show(20,false) //默认inner连接
*/
      spark.stop()

  }

  case class info(id : Int, name : String, phone : String, email : String)

}

猜你喜欢

转载自blog.csdn.net/shujuelin/article/details/83865948