版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/shujuelin/article/details/83865948
1、SQLContextApp
package sparkSQLmook
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* spark1.6的SQL使用,本地模式
*/
object SQLContextApp {
/* def main(args: Array[String]): Unit = {
//val path = args(0)
val conf = new SparkConf()
.setAppName("SQLContextApp")
.set("spark.sql.warehouse.dir","file:///")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("json").load("C://Users//shujuelin//Desktop//spark//people.json")
df.show()
sc.stop()
}*/
//生产模式
def main(args: Array[String]): Unit = {
val path = args(0)
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("json").load(path)
df.show()
sc.stop()
}
}
2、SparkSessionApp
package sparkSQLmook
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
object SparkSessionApp {
def main(args: Array[String]): Unit = {
val spark = new SparkSession
.Builder()
.master("local")
.appName("SparkSessionApp")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")
df.show()
}
}
3、ParquetDemo
package sparkSQLmook
import org.apache.spark.sql.SparkSession
object ParquetDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("ParquetDemo")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
/**
* spark.read.format("parquet").load标准写法
*/
//val rddDF = spark.read.format("parquet").load("C://Users//shujuelin//Desktop//spark//users.parquet")
//rddDF.show()
//sparksql默认的处理format就是parquet
// rddDF.select("name","favorite_color").write.format("json").save("C://Users//shujuelin//Desktop//spark//userss.json")
/**
* 通用型
*/
val DF = spark.read.load("C://Users//shujuelin//Desktop//spark//users.parquet")
DF.show(false)
spark.stop()
}
}
4、JdbcBeelineSQL
package sparkSQLmook
import java.sql.DriverManager
//通过jdbc方式访问sparkSQL
object JdbcBeelineSQL {
def main(args: Array[String]) {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://spark1:10000","root","")
val pstmt = conn.prepareStatement("select name,age,score from students")
val rs = pstmt.executeQuery()
while (rs.next()) {
println("name:" + rs.getString("name") +
" , age:" + rs.getInt("age") +
" , score:" + rs.getDouble("score"))
}
rs.close()
pstmt.close()
conn.close()
}
}
5、HiveMySQLApp
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* 使用外部数据源综合查询Hive和MySQL的表数据
* 外部数据源综合案例(重要) 把hive里的表和mysql里的表结合在一起
create database spark;
use spark;
//创建表
CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;
//插入数据
INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("HiveMySQLApp")
.master("local[2]").getOrCreate()
// 加载Hive表数据
val hiveDF = spark.table("emp")
// 加载MySQL表数据
val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
// JOIN
val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
mysqlDF.col("deptno"), mysqlDF.col("dname")).show
spark.stop()
}
}
6、HiveAPP
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* sparkSQL操作hive表数据 1.读:spark.table(tablename)
* 2.写:df.write.saveAsTable(tablename)
*/
object HiveAPP {
//采用spark.sql方式 --->在spark-shell里操作
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataFrameRdd")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
import spark.implicits._
//val HiveDf = spark.table("t_movies")
spark.sql("show databases").show()
}
}
7、DatasetApp
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* Dataset操作 ->读取scv文件
*/
object DatasetApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("DatasetApp")
.master("local[2]").getOrCreate()
//注意:需要导入隐式转换
import spark.implicits._
val path = "file:///f:/text/sales.csv"
//spark如何解析csv文件? 头:就是csv中的字段 inferSchema:自动推断schema
val df = spark.read.option("header","true").option("inferSchema","true").csv(path)
df.show
val ds = df.as[Sales] //DataFrame转换为DataSet
//map是迭代,每一行只取出id
ds.map(line => line.itemId).show
spark.sql("seletc name from person").show
//df.seletc("name")
df.select("name")
ds.map(line => line.itemId)
spark.stop()
}
case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}
8、DataFrameRdd
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* 将RDD转换为DataFrame 第一种方式:采用反射的方式
*/
object DataFrameRdd {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataFrameRdd")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
//RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")
import spark.implicits._
//把Rdd转换为DataFrame
val lineDF = rdd.map(_.split(",")).map(line => info(line(0).toInt,line(1),line(2).toInt)).toDF()
/* val lineRDD = rdd.map(line => line.split(","))
val lineDF = lineRDD.map(lines => info(lines(0).toInt,lines(1),lines(2).toInt)).toDF()*/
//lineDF.show()
//1.基于DataFrame的api编程
//lineDF.filter($"age">20).show()
//2.基于sql的api
lineDF.createOrReplaceTempView("info")
spark.sql("select name,age from info where age >20").show()
spark.stop()
}
case class info(id : Int, name : String, age : Int)
}
9、DataFrameRdd2
package sparkSQLmook
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* 把RDD转换为DataFrame的第二种方式:编程式Row。当不知道数据的类型时候,采用
*/
object DataFrameRdd2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameRdd2")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")
//1.把rdd转为row
val rddRow = rdd.map(_.split(",")).map(line => Row(line(0).toInt,line(1),line(2).toInt))
//2.对row创建scheme元数据结构
val structType = StructType(Array(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)))
//把row和scheme绑定
val df = spark.createDataFrame(rddRow,structType)
//df.show()
//sql式编程
df.createOrReplaceTempView("info")//创建临时表
spark.sql("select * from info where age > 20").show()
spark.stop()
}
}
10、DataFrameOperation
package sparkSQLmook
import org.apache.spark.sql
import org.apache.spark.sql.functions._
/**
* DataFrame的API操作
*/
object DataFrameOperation {
def main(args: Array[String]): Unit = {
val spark = new sql.SparkSession
.Builder()
.master("local")
.appName("SparkSessionApp")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
//隐式转换
import spark.implicits._
//val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")
val df = spark.read.format("json").load("C:/Users/shujuelin/Desktop/spark/people.json")
//df.show()//show()默认20条
//df.printSchema()
//df.select("name").show()//select操作,典型的弱类型,untyped操作
// df.select($"name", $"age" + 1).show() // 使用表达式,scala的语法,要用$符号作为前缀。对年龄加1
//df.filter($"age">19).show()
//另一种写法
//df.filter(df.col("age")>19).show()
df.select(df.col("name"),(df.col("age")+3).as("age2")).show() //别名
//df.groupBy("age").count().show()//先分组在进行聚合
spark.stop()
}
}
11、DataFrameCase
package sparkSQLmook
import org.apache.spark.sql.SparkSession
/**
* DataFrame的案例(api其他操作)
*/
object DataFrameCase {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DataFrameCase")
.master("local[2]")
.config("spark.sql.warehouse.dir","file:///")
.getOrCreate()
val rdd = spark.sparkContext.textFile("C://Users//shujuelin//Desktop//spark//student.data")
import spark.implicits._
// 分割符|必须要加转义字符\\
//采用反射的方式转换为dataframe
val infoDF = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()
infoDF.show(false)//show 默认显示20条 如果超出可以用 infoDF.show(30,false),false代表不截取
//infoDF.take(10).foreach(println)
//infoDF.first()//拿第一条
//infoDF.head(3) // 拿前三条
//infoDF.select("name","phone").show()
//infoDF.show(20,false)
//过滤名字为空的和NULL的
//infoDF.filter("name = '' or name = 'NULL'").show()
//过滤名字以s开头的
//infoDF.filter("substr(name,0,1) = 's'").show(20,false)
//排序
//按照名字排序.默认升序
//infoDF.sort($"name".desc).show()//或者 infoDF.sort(infoDF.col("name").desc).show()
//infoDF.sort(infoDF.col("name").asc,infoDF.col("id").desc).show(20,false)
//改字段名字
//infoDF.select(infoDF.col("name").as("student_info")).show(20,false)
//join操作
/* val infoDF2 = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()
infoDF.join(infoDF2, infoDF.col("id") === infoDF2.col("id")).show(20,false) //默认inner连接
*/
spark.stop()
}
case class info(id : Int, name : String, phone : String, email : String)
}