承接SparkSql系列--需求01 点击打开链接
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Properties}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.slf4j.LoggerFactory
/**
* SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
* SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
* SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。
* 当前医院和日期(天)维度下,用户总数
*模板示例
**/
class AccountService {
val logger = LoggerFactory.getLogger(classOf[AccountService])
def formMysql(spark: SparkSession,tableName:String): DataFrame ={
spark.read.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://ip:3306/icu_test?useUnicode=true&characterEncoding=utf-8&user=test&password=test")
.option("dbtable", tableName) //必须写表名
.load()
}
//前一天
def getYesterday():String= {
val dateFormat:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd")
val cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-1)
dateFormat.format(cal.getTimeInMillis)+" 00:00:00"
}
//当天
def getNowDate():String={
val now:Date=new Date()
val dateFormat:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd")
dateFormat.format(now)+" 00:00:00"
}
//当天int->查询日期的时候int类型要比datetime性能好,速度快
def getNowDateInt():Int={
val now:Date=new Date()
val dateFormat:SimpleDateFormat=new SimpleDateFormat("yyyyMMdd")
Integer.valueOf(dateFormat.format(now))
}
//医院临时存储
def modelHospital(df:DataFrame): Unit ={df.select("id", "name","level","province","city","area","status","ctime").write.mode("overwrite").saveAsTable("model_hospital")
}
//用户临时存储
def modelAccount(df:DataFrame):Unit={
df.select("id", "hospital", "ctime").write.mode("overwrite").saveAsTable("model_account")
}
//地区临时存储
def modelArea(df:DataFrame):Unit={
df.select("id", "name").write.mode("overwrite").saveAsTable("model_area")
}
//生成rowRDD存储集合
def queryMysql2(spark: SparkSession):Unit={
val accountDF=formMysql(spark,"account") //用户表
val hospitalDF=formMysql(spark,"hospital") //医院表
val areaDF=formMysql(spark,"area")
val currDate=getNowDate()//当前时间
val currDateInt=getNowDateInt() //当前时间int类型
val beforeDate=getYesterday() //前一天
//临时存储医院,用户,地区表
modelHospital(hospitalDF)
modelAccount(accountDF)
modelArea(areaDF)
//医院维度,日期(天)维度对应总数和最新时间
val accountSql=spark.sql("SELECT q.hospital,count(1) AS num,MAX(q.ctime) AS ctime FROM model_account q LEFT JOIN model_hospital h ON q.hospital=h.id WHERE 1=1 AND to_unix_timestamp(q.ctime,'yyyy-MM-dd HH:mm:ss')< to_unix_timestamp('"+currDate+"','yyyy-MM-dd HH:mm:ss') AND h.`level`<>13 GROUP BY q.hospital LIMIT 1")
val accountList=accountSql.collect()
var provinceName=""//省份名称
var cityName="" //城市名称
var areaName="" //区县名称
var hospital=0 //医院id
var num=0 //当前总数
var increase=0 //比前一总数的差值
val accountRanks=new util.ArrayList[Row]()
if(accountList.nonEmpty){
for(account <- accountList){
import spark.implicits._
hospital=Integer.parseInt(account(0).toString)
num=Integer.parseInt(account(1).toString)
//前一天总数
val increaseSql=spark.sql("SELECT * FROM model_account q WHERE 1=1 AND q.hospital="+hospital+" AND to_unix_timestamp(q.ctime,'yyyy-MM-dd HH:mm:ss')< to_unix_timestamp('"+beforeDate+"','yyyy-MM-dd HH:mm:ss') ")
//获取医院属性
val areaSql=spark.sql("SELECT id,province,city,area,ctime,name FROM model_hospital WHERE id="+hospital)
val beforeNum=increaseSql.count().toInt
increase=num-beforeNum
val areaList=areaSql.collect()
if(areaList.nonEmpty){
for(areas <- areaList){
val hospitalId=areas.getInt(0)
val province=areas.getInt(1)
val city=areas.getInt(2)
val area=areas.getInt(3)
val hospitalCtime=areas.getTimestamp(4)
val hospitalName=areas.getString(5)
//省市id
if(province > 0){
provinceName=spark.sql("SELECT name FROM model_area WHERE id="+province).map(x=>x.getAs[String]("name")).collect().mkString
}
//城市id
if(city > 0){
cityName=spark.sql("SELECT name FROM model_area WHERE id="+city).map(x=>x.getAs[String]("name")).collect().mkString
}
//区县id
if(area > 0){
areaName=spark.sql("SELECT name FROM model_area WHERE id="+area).map(x=>x.getAs[String]("name")).collect().mkString
}
//生成RowRdd集合
accountRanks.add(Row(hospitalId.toLong,province.toLong,city.toLong,
area.toLong,num,increase,currDate,hospitalName,provinceName,cityName,
areaName,currDateInt,hospitalCtime.toString))
}
}
}
addMysql(spark,accountRanks)
}
}
//另一库中存储结果数据
def addMysql(spark: SparkSession,accountRanks:util.ArrayList[Row]):Unit={
if(!accountRanks.isEmpty){
val structFields = new util.ArrayList[StructField]
//设置映射字段
structFields.add(DataTypes.createStructField("hospital_id", DataTypes.LongType, false))
structFields.add(DataTypes.createStructField("province", DataTypes.LongType, false))
structFields.add(DataTypes.createStructField("city", DataTypes.LongType, false))
structFields.add(DataTypes.createStructField("area", DataTypes.LongType, false))
structFields.add(DataTypes.createStructField("current_num", DataTypes.IntegerType, false))
structFields.add(DataTypes.createStructField("increase_num", DataTypes.IntegerType, false))
structFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, false))
//structFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, false))
structFields.add(DataTypes.createStructField("hospital_name", DataTypes.StringType, false))
structFields.add(DataTypes.createStructField("province_name", DataTypes.StringType, false))
structFields.add(DataTypes.createStructField("city_name", DataTypes.StringType, false))
structFields.add(DataTypes.createStructField("area_name", DataTypes.StringType, false))
structFields.add(DataTypes.createStructField("new_time", DataTypes.IntegerType, false))
structFields.add(DataTypes.createStructField("hospital_ctime", DataTypes.StringType, false))
//通过StrutType直接指定每个字段的schema
val structType=DataTypes.createStructType(structFields)
//创建Properties存储数据库相关属性
val prop = new Properties()prop.put("user", "test")
prop.put("password", "test")
val accounDataFrame = spark.createDataFrame(accountRanks,structType)
accounDataFrame.write.mode("append").jdbc("jdbc:mysql://ip:3306/icu_test?useUnicode=true&characterEncoding=utf-8",
"test_account_hospital_rank",prop)
}
}
}
case class AccountHospitalRank(hospitalId:Long,province:Long,city:Long,area:Long,
currentNum:Int,increaseNum:Int,createTime:String,
updateTime:String,hospitalName:String,provinceName:String,
cityName:String,areaName:String,newTime:Int,hospitalCtime:String)
object AccountService{
def main (args: Array[String] ): Unit = {
//val warehouseLocation = ""
val sparkConf=new SparkConf().setAppName("dw_account").setMaster("local[2]")
//sparkConf.set("spark.sql.warehouse.dir",warehouseLocation)
val mysqlSpark=SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val accountService=new AccountService
accountService.queryMysql2(mysqlSpark)
}
}