package report
import config.ConfigHelper
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import utils.MakeATPKpi
//利用sparksql进行报表分析
object TrainTimeSparkSQLAnalysis {
def main(args: Array[String]): Unit = {
//session
val session = SparkSession
.builder()
//设置是否是在本地运行
//*代表几个线程
.master("local[*]")
//设置进程的名称
.appName(this.getClass.getName)
//序列化
.config("spark.serializer", ConfigHelper.serializer)
.getOrCreate()
//导入隐式转换
import session.implicits._
//读取原始日志
val frame = session.read.parquet(args(0))
//读取列车出厂时间的数据
val trainTimeSource: RDD[String] = session.sparkContext.textFile(args(1))
//切分并且数据清洗
val filted = trainTimeSource.map(_.split("\\|",-1)).filter(_.length>=2)
val trainTimeDF: DataFrame = filted.map(arr=>(arr(0),arr(1))).toDF("trainId","trainTime")
//注册表
//临时表
frame.createTempView("logs")
//创建临时视图
trainTimeDF.createTempView("trainTime")
//注册udf
session.udf.register("myif",(boolean:Boolean)=>if (boolean) 1 else 0)
//写sql语句
//GROUP BY我们可以先从字面上来理解,GROUP表示分组,BY后面写字段名,就表示根据哪个字段进行分组。
//GROUP BY必须得配合聚合函数来用,分组之后你可以计数(COUNT),求和(SUM),求平均数(AVG)等。
session.sql(
"select a.trainTime,sum(c.cnt),sum(c.atpError),sum(c.main),sum(c.wifi),sum(c.balise),sum(c.TCR),sum(c.speed),sum(c.DMI),sum(c.TIU),sum(c.JRU) from trainTime a join (select MPacketHead_TrainID,count(*) as cnt,"+MakeATPKpi.sqlStr+
" from logs group by MPacketHead_TrainID) c on a.trainId=c.MPacketHead_TrainID group by a.trainTime").show()
//释放资源
session.stop()
}
}
工具类
MakeATPKpi.scala:
package utils
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.Row
object MakeATPKpi {
def makeKpi(row:Row)={
//获取atperror
val atpError = row.getAs[String]("MATPBaseInfo_AtpError")
//判断指标
//如果atpError不为空,前面为1,如果atpError为空,那么前面为0
//先判断atpError不为空,如果为空,则执行下面的else语句,全部输出为0
val listAtpError: List[Int] = if (StringUtils.isNotEmpty(atpError)) {
val listError: List[Int] =
if (atpError.equals("车载主机")) {
List[Int](1, 0, 0, 0, 0, 0, 0, 0)
} else if (atpError.equals("无线传输单元")) {
List[Int](0, 1, 0, 0, 0, 0, 0, 0)
} else if (atpError.equals("应答器信息接收单元")) {
List[Int](0, 0, 1, 0, 0, 0, 0, 0)
} else if (atpError.equals("轨道电路信息读取器")) {
List[Int](0, 0, 0, 1, 0, 0, 0, 0)
} else if (atpError.equals("测速测距单元")) {
List[Int](0, 0, 0, 0, 1, 0, 0, 0)
} else if (atpError.equals("人机交互接口单元")) {
List[Int](0, 0, 0, 0, 0, 1, 0, 0)
} else if (atpError.equals("列车接口单元")) {
List[Int](0, 0, 0, 0, 0, 0, 1, 0)
} else if (atpError.equals("司法记录单元")) {
List[Int](0, 0, 0, 0, 0, 0, 0, 1)
} else {
//这是为了防止获取的atpError的值都不符合上述判断条件的情况
List[Int](0, 0, 0, 0, 0, 0, 0, 0)
}
//两个list的拼接要用++
List[Int](1) ++ listError
} else {
//如果atpError为空的情况,说明这行的这个属性没有值,那么全取为0即可
List[Int](0, 0, 0, 0, 0, 0, 0, 0, 0)
}
//创建一个容器用来存放标签
//两个list的拼接要用++
//这个是为了在整合以后统计总共多少条数据用,相当于数量
val list: List[Int] = List[Int](1) ++ listAtpError
list
}
//sql字符串
val sqlStr="sum(if(MATPBaseInfo_AtpError != '',1,0)) as atpError,sum(case when MATPBaseInfo_AtpError = '车载主机' then 1 else 0 end) as main,"+
"sum (myif(MATPBaseInfo_AtpError = '无线传输单元') ) as wifi,"+
"sum (myif(MATPBaseInfo_AtpError = '应答器信息接收单元') ) as balise,"+
"sum (myif(MATPBaseInfo_AtpError = '轨道电路信息读取器') ) as TCR,"+
"sum (myif(MATPBaseInfo_AtpError = '测速测距单元') ) as speed,"+
"sum (myif(MATPBaseInfo_AtpError = '人机交互接口单元') ) as DMI,"+
"sum (myif(MATPBaseInfo_AtpError = '列车接口单元') ) as TIU,"+
"sum (myif(MATPBaseInfo_AtpError = '司法记录单元') ) as JRU"
}
配置文件
application.conf:
#配置文件
#配置压缩格式
parquet.code="snappy"
#配置序列化方式
spark.serializer="org.apache.spark.serializer.KryoSerializer"
#配置jdbc链接
jdbc.url="jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8"
jdbc.driver="com.mysql.jdbc.Driver"
jdbc.user="root"
jdbc.password="000000"
#配置scalikejdbc链接
db.default.url="jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8"
db.default.driver="com.mysql.jdbc.Driver"
db.default.user="root"
db.default.password="000000"
ConfigHelper.scala:
package config
import com.typesafe.config.{Config, ConfigFactory}
object ConfigHelper {
//加载配置文件
private lazy val load: Config = ConfigFactory.load()
//加载压缩格式
val parquetCode: String = load.getString("parquet.code")
//加载序列化方式
val serializer: String = load.getString("spark.serializer")
//加载jdbc
val url: String = load.getString("jdbc.url")
val driver: String = load.getString("jdbc.driver")
val user: String = load.getString("jdbc.user")
val password: String = load.getString("jdbc.password")
//加载scalikejdbc
val url2: String = load.getString("db.default.url")
val driver2: String = load.getString("db.default.driver")
val user2: String = load.getString("db.default.user")
val password2: String = load.getString("db.default.password")
}