版权声明:学习交流为主,未经博主同意禁止转载,禁止用于商用。 https://blog.csdn.net/u012965373/article/details/81982858
import java.sql.{PreparedStatement, ResultSet}
import com.alibaba.fastjson.JSON
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import we.com.util.{KafkaManager, MysqlPoolUtils}
/**
* @author yangxin
* 解析数据库binLog日志到Mysql数据库
* 基本原理:
* 1.读取日志解析对应的数据表的Log操作,先正则每条日志,找到匹配到的原始日志
* 2.根据判断表中的主键或者关键字段选择是否解析或者跳过当前的日志
* 3.将匹配到包含关键字段的日志数据,获取对数据操作的方式insert、update
* 4.然后再去获取每个字段的含有,再将要执行的SQL语句加到批处理
* 5.执行批处理,同步数据到数据库
*/
object BinLogToMysql {
private val appName = "BinLogToMysqlService"
private val LOG = Logger.getLogger(appName)
// SparkStreaming 运行方式
private val master = "yarn"
// 数据表的正则表达式
private val accountRegex = """(.*"tableName":"account".*)""".r
// 处理RDD操作
def processRdd(rdd: RDD[(String, String)]): Unit = {
rdd.foreachPartition {
partition => {
if (partition.nonEmpty) {
val conn = MysqlPoolUtils.getConnection.get
val rs: ResultSet = null
var insertSQL = "INSERT INTO rt_account(fundAcc, is_lender, idcard_info_id) VALUES (?,?,?)"
var updateSQL = "update ...."
val stmt1 = conn.prepareStatement(insertSQL)
val stmt2 = conn.prepareStatement(updateSQL)
stmt1.addBatch()
try {
partition.foreach { line =>
val lineVal = line._2
lineVal match {
case accountRegex(accountJson) => {
val accountObj = JSON.parse(accountJson)
accountObj match {
// 匹配主要关键的字段、主键是否存在
case Some(mapStr: Map[String, Any]@unchecked) if mapStr.contains("id") && mapStr.contains("version") => {
val pointEventType = mapStr.getOrElse("dataEventType", "").toString
if (null != pointEventType && "insert".equals(pointEventType)) {
// addAccountData(stmt1, mapStr)
// 解析mapStr JSON字符串,减将对应的key的value放到对应SQL中的位置
stmt1.addBatch()
} else if (null != pointEventType && "update".equals(pointEventType)) {
// addAccountData(stmt2, mapStr)
// 解析mapStr JSON字符串,减将对应的key的value放到对应SQL中的位置
stmt2.addBatch()
}
}
case other => {
LOG.error("Data Struct Error")
}
case None => {
LOG.error("DataParser failded")
}
}
}
}
}
}
finally {
stmt1.executeBatch()
stmt2.executeBatch()
}
}
}
}
}
// 关闭数据库连接操作
def close(resultSet: ResultSet, stmts: PreparedStatement*): Unit = {
for (stmt <- stmts) {
if (resultSet != null)
resultSet.close()
if (stmt != null)
stmt.close()
}
}
// 程序的run方法
def run(ssc: StreamingContext, brokers: String, topics: String): Unit = {
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> "BinLogToMysqlService",
"auto.offset.reset" -> "offsetStrategies"
)
val topicSet = topics.split(",").toSet
val kafkaManager = new KafkaManager(kafkaParams)
val messagesStream = kafkaManager.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
)
messagesStream.asInstanceOf[InputDStream[(String, String)]].foreachRDD(
rdd => {
processRdd(rdd)
kafkaManager.updateZKOffsets(rdd)
}
)
}
// 程序的主入口Main方法
def main(args: Array[String]): Unit = {
if (args.length < 2) {
LOG.error(
s"""Usage: KafkaBinLogToMysql <brokers> <topics>
| <brokers> is a list of one or more kafka brokers
| <topics> is a list of one or more kafka topics split by "," to consume
""".stripMargin)
System.exit(1)
}
// 开启Spark自动使用系统负载选择最优消费速率
val sc = new SparkConf().set("spark.streaming.backpressure.enabled", "true").setAppName(appName).setMaster(master)
// 设置每批处理数据时间间隔
val ssc = new StreamingContext(sc, Seconds(10))
// 接受的brokers,topics参数
val Array(brokers, topics) = args
// 运行
run(ssc, brokers, topics)
// 开始SparkStreaming实时程序
ssc.start()
// 设置自动的停止,保证数据接受完全
val checkInterValMillisc = 20000
var isStopped:Boolean = false
while (!isStopped) {
isStopped = ssc.awaitTerminationOrTimeout(checkInterValMillisc)
if (!isStopped) {
ssc.stop(stopSparkContext=true, stopGracefully=true)
}
}
}
}