import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.slf4j.{Logger, LoggerFactory}
import utils.Logs
/**
* @Auther: sss
* @Date: 2018/11/9 11:39
* @Description:
*/
object Demo03 {
private val logger = LoggerFactory.getLogger(Demo03.getClass)
def main(args: Array[String]): Unit = {
val session = SparkSession
.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
import session.implicits._
val frame = session.read.json("e:\\json1.json")
frame.createOrReplaceTempView("tables01")
val sql_a =
"""
|select * from tables01
""".stripMargin
val df1 = session.sql(sql_a)
// df1.foreach(t=> println(t))
println("-------------111111111---------------------")
val rdd = df1.rdd.flatMap {
row =>
val dataId = row.getAs[Long]("dataId")
val dataType = row.getAs[String]("dataType")
println(dataType.toString + "-------------2222222222222222---------------------")
row.getAs[Seq[Row]]("resultData").map {
rows =>
val binlog = rows.getAs[String]("binlog")
var tag = ""
rows.getAs[Seq[Row]]("column").map {
row02 =>
val columnname = row02.getAs[String]("columnname")
val columntype = row02.getAs[String]("columntype")
val index = row02.getAs[Long]("index")
val modified = row02.getAs[Boolean]("modified")
val pk = row02.getAs[Boolean]("pk")
val sqlType = row02.getAs[Long]("sqlType")
val value = row02.getAs[String]("value")
tag = String.format(s"[columnname:$columnname,columntype:$columntype,index:$index,modified:$modified,pk:$pk,sqlType:$sqlType,value:$value]")
}
val db = rows.getAs[String]("db")
val eventType = rows.getAs[String]("eventType")
val pkValue = rows.getAs[String]("pkValue")
val sql = rows.getAs[String]("sql")
val table = rows.getAs[String]("table")
val time = rows.getAs[Long]("time")
val tags = String.format(s"[dataId:$dataId,dataType:$dataType,binlog:$binlog,tag:$tag,db:$db,eventType:$eventType,pkValue:$pkValue,sql:$sql,table:$table,time:$time]")
//dataId, dataType, binlog, tag, db, eventType, pkValue, sql, table, time,
( tags)
}
}
rdd.toDF().write.saveAsTable("tmp_table")
//,_2,_3,_4,_5,_6,_7,_8,_9,_10
val result =
"""
|select * from tmp_table
""".stripMargin
session.sql(result).write.json("C:\\Users\\sss\\Desktop\\outExample01")
}
}
数据如下:
{"dataId":123,"dataType":"mysql","resultData":[{"binlog":"mysql_binlog.000","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"7"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"7"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"7"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"7"}],"db":"demo","eventType":"insert","pkValue":"7","sql":"woshisql","table":"student","time":80146942099474},{"binlog":"mysql_binlog.001","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"9"}],"db":"demo","eventType":"insert","pkValue":"9","sql":"woshisql","table":"student","time":80146943574276},{"binlog":"mysql_binlog.002","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"2"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"2"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"2"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"2"}],"db":"demo","eventType":"update","pkValue":"2","sql":"woshisql","table":"student","time":80146943586222},{"binlog":"mysql_binlog.003","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"4"}],"db":"demo","eventType":"update","pkValue":"4","sql":"woshisql","table":"student","time":80146943592561},{"binlog":"mysql_binlog.004","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"6"}],"db":"demo","eventType":"update","pkValue":"6","sql":"woshisql","table":"student","time":80146943599876},{"binlog":"mysql_binlog.005","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"4"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"4"}],"db":"demo","eventType":"update","pkValue":"4","sql":"woshisql","table":"student","time":80146943605971},{"binlog":"mysql_binlog.006","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"6"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"6"}],"db":"demo","eventType":"update","pkValue":"6","sql":"woshisql","table":"student","time":80146943611335},{"binlog":"mysql_binlog.007","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"9"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"9"}],"db":"demo","eventType":"update","pkValue":"9","sql":"woshisql","table":"student","time":80146943617917},{"binlog":"mysql_binlog.008","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"10"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"10"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"10"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"10"}],"db":"demo","eventType":"update","pkValue":"10","sql":"woshisql","table":"student","time":80146943623525},{"binlog":"mysql_binlog.009","column":[{"columnname":"single_cloum0","columntype":"varchar(10)","index":0,"modified":false,"pk":false,"sqlType":0,"value":"1"},{"columnname":"single_cloum1","columntype":"varchar(10)","index":1,"modified":false,"pk":false,"sqlType":0,"value":"1"},{"columnname":"single_cloum2","columntype":"int(5)","index":2,"modified":false,"pk":false,"sqlType":0,"value":"1"},{"columnname":"single_cloum3","columntype":"int(5)","index":3,"modified":false,"pk":false,"sqlType":0,"value":"1"}],"db":"demo","eventType":"update","pkValue":"1","sql":"woshisql","table":"student","time":80146943629864}]}