package config
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
case object conf {
private val master = "local[*]"
val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")
// val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs")
val sc = new SparkContext(confs)
sc.setLogLevel("ERROR")
val spark_session: SparkSession = SparkSession.builder()
.appName("jobs").config(confs).getOrCreate()
// 设置支持笛卡尔积 对于spark2.0来说
spark_session.conf.set("spark.sql.crossJoin.enabled",true)
}
package operationMysql
import config.conf.{sc, spark_session}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import config.conf.spark_session.implicits._
object readingMysqlOperation {
def main(args: Array[String]): Unit = {
/*
val df: DataFrame = spark_session.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/junsheng?useUnicode=true&characterEncoding=utf-8")
.option("dbtable", "订单")
.option("user", "root")
.option("password", "123456")
.load()
df.show()
* */
//以jdbc方式连接mysql
val url="jdbc:mysql://localhost:3306/junsheng?useUnicode=true&characterEncoding=utf-8" //&useSSL=true
//设置用户名和密码信息
val prop = new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","123456")
//创建sqlContext对象
//读取dat_order_item表
val df1: DataFrame = spark_session.read.jdbc(url,"订单明细","订单ID",0,5000000,4,prop)
val df2: DataFrame = spark_session.read.jdbc(url, "订单", "订单ID", 0, 5000000,4,prop)
//读取dat_order表
//将dat_order_item和dat_order DF注册成spark临时表
df1.createOrReplaceTempView("data1")
df2.createOrReplaceTempView("data2")
//使用sqlContext.sql("XXX")方式执行查询语句
// df2.show()
val ywSQL:String= "SELECT dt1.`订单ID`,dt2.`客户ID`,dt1.`产品ID`,dt1.`单价`,dt1.`数量` " +
"FROM data1 AS dt1 LEFT JOIN data2 as dt2 ON dt1.`订单ID`=dt2.`订单ID`"
val df: DataFrame = spark_session.sql(ywSQL)
df.rdd.map(lines=>{(lines(0).toString,lines(2).toString.toDouble,lines(4).toString.toInt)})
.toDF("订单ID","产品单价","订购数量").show()
}
}