Spark发展至今,对于流处理有着自己的优势,本篇博文给大家介绍一下,Spark流处理的优雅停止,作用在让计算自己停止,而非人为的暴力干预
给大家写了一个流处理做字频统计,结果写入mysql,做优雅停止的实例,希望可以帮到大家,大家看代码的时候要注意一个事项,我担心大家看不懂,写代码的时候把代码写的比较宽泛,大家看懂之后,自己写的时候可以精简一下
object StreamGranceStop2 {
def main(args: Array[String]): Unit = {
//下面的计算主体和正常的没差别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val dataDS: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.182.147",9999)
val wordDS: DStream[String] = dataDS.flatMap(_.split(" "))
val tupleDS: DStream[(String, Int)] = wordDS.map((_,1))
tupleDS.reduceByKey(_+_).foreachRDD(rdd=>{
rdd.foreach(word=>{
Class.forName("com.mysql.jdbc.Driver")
//获取mysql连接
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "")
//把数据写入mysql
try {
var totalcount = word._2
var sql = "";
var querysql="select count from wordcount where word='"+word._1+"'"
val queryresult: ResultSet = conn.prepareStatement(querysql).executeQuery()
if(queryresult.next()){
totalcount = queryresult.getString("count").toInt+word._2
sql = "update wordcount set count='"+totalcount+"' where word='"+word._1+"'"
}else{
sql = "insert into wordcount (word,count) values ('" + word._1 + "','" + totalcount + "')"
}
conn.prepareStatement(sql).executeUpdate()
println("保存结束--------------------------------------------------------------")
} finally {
conn.close()
}
})
})
//开始优雅停止
val checkIntervalMillis = 10000 //等待的毫秒数
var stopFlag:Boolean = false //准备一个布尔对象,后面做是否真正停止用
var isStopped = false //流对象没有对数据流执行任务的标识,默认是false,标识任然在计算
ssc.start() //和正常流计算一样要调用start方法
//但是注意start之后不在是直接调用之前的awaitTermination
//优雅停止是Driver判断的事所以这里写的while不会影响到Spark子节点的运算
//整个while的结束条件就是之前准备的isStopped
while (! isStopped) {
//优雅停止的方法,需要传入一个毫秒数,用来判断在传入的毫秒数内,executor中
//是否还有数据曾被计算,返回是(true)则表示没有了,注意这里思路一定要清晰
//再次强调这个方法是有数据曾被计算则返回false,否则为true
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
//这里做一个当前状态的提示输出
if (isStopped)
println("ssc 10秒内流对象没有进行过计算任务,预计可以退出程序!")
else
println("ssc正在运行. 不可退出...")
//调用自定义的方法--用实际行动去判断是否真的可以停了
checkShutdownMarker
//对最终结果做处理,注意我任然用!isStopped是因为我没对isStopped的值做修改
println("!isStopped && stopFlag"+(!isStopped && stopFlag))
if ( !isStopped && stopFlag ) {
println("以确认可以停止,现在开始停止ssc:")
ssc.stop(true, true)//立马停止
println("ssc 停止!!!!!!!")
}
}
//因为这个例子是词频统计写入mysql,所以我们对mysql里面的数据做查询如果到达了我们的预期就可以停止
def checkShutdownMarker:Unit={
println("开始检查是否满足停止条件。。。。。")
Class.forName("com.mysql.jdbc.Driver")
//获取mysql连接
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "")
var querysql="select sum(count) sumcnt from wordcount"
val queryresult: ResultSet = conn.prepareStatement(querysql).executeQuery()
//满足条件就正式停止
if(queryresult.next()&&queryresult.getInt("sumcnt")>=10){
stopFlag=true
}
}
}
}