当Spark工作在Client模式下时,由于Driver在作业提交机器上运行,Driver进程是可以看到的,可以用kill(不是kill -9)杀死Driver进程,此时,如果设置了优雅停止,就会调用钩子函数进行优雅地停止。
当Spark工作在Cluster模式下时,Driver运行在集群的那一台机器上我们是无法确定的(YARN模式下由ResourceManager决定),因此无法用kill取杀死Driver进程。当工作在YARN模式下时,可以使用yarn application kill applicationID杀死指定程序。用这种方式停止程序,ResourceManager会给定一定的时间(如1s)让Driver停止,但是如果在给定的时间内作业没有完成,那么ResourceManager会将其强制杀死,但是这不是我们希望看到的,我们希望Driver优雅地退出。
那么如何在YARN的Cluster模式下优雅退出?
我们采用的方法是启动一个监控进程,每20s查看一次stopPath,如果stopPath存在,则停止streaming,停止后StreamingContextState就变成了STOPPED,监控进程检测到StreamingContextState变化为STOPPED后,就会停止。
在提交作业时,不论stopPath是否存在,都要尝试删除此路径。
由于此路径存在于HDFS,因此为了验证此路径是否存在,需要连接HDFS。
当采用YARN Cluster模式时,SparkSubmit进程提交作业,然后就只负责接受任务是否正常运行的消息,不再负责任何其他功能,提交作业后,会在某一台机器上执行Driver进程,此时如果想停止SparkStreamin,kill SparkSubmit进程已经没有意义了,必须让Driver进程自己停止,因此,我们通过线程监控目录的方式停止Driver。
需要关闭的时候,手动在hdfs上面创建一个文件夹就好了;
package com.atguigu
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
class MonitorStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"), new Configuration(), "atguigu")
while (true) {
try
Thread.sleep(5000)
catch {
case e: InterruptedException =>
e.printStackTrace()
}
val state: StreamingContextState = ssc.getState
val bool: Boolean = fs.exists(new Path("hdfs://hadoop102:9000/stopSpark"))
if (bool) {
if (state == StreamingContextState.ACTIVE) {
//把sc也停止了
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
package com.atguigu
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {
//当前批次内容的计算
val sum: Int = values.sum
//取出状态信息中上一次状态
val lastStatu: Int = status.getOrElse(0)
Some(sum + lastStatu)
}
val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")
//设置优雅的关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("./ck")
val line: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
val word: DStream[String] = line.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
wordAndCount.print()
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
new Thread(new MonitorStop(ssc)).start()
ssc.start()
ssc.awaitTermination()
}
}