版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
直接kill -9?不好吧,万一我这个程序还在处理数据呢?还没处理完呢?在处理的数据丢失了呢?但是我又想让它先停一下呢?
好了,直接上代码吧(语言组织不好)~
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.StreamingContext
trait StreamingStopper {
// check every 10s for shutdown hdfs file
val checkIntervalMillis = 10000 // 每个10S去检查hdfs中的一个文件,如果文件不存咋了,那么就停止
var isStopped = false
val shutdownFilePath = Option(System.getProperty("web.streaming.shutdown.filepath"))
.getOrElse(sys.error("web.streaming.shutdown.filepath can not be null"))
def stopContext(ssc: StreamingContext) = {
while (!isStopped) {
val isStoppedTemp = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
if (!isStoppedTemp && isShutdownRequested) {
val stopSparkContext = true
val stopGracefully = true
isStopped = true
ssc.stop(stopSparkContext, stopGracefully) // 如果两个都是true,那么就会等一个周期处理好后退出
}
}
}
def isShutdownRequested(): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(shutdownFilePath))
}
}
使用:
extends StreamingStopper
stopContext(ssc)