Spark报警之StreamingListener
引言
关于spark的报警,我们是怎么做的呢?
1、程序本身报警,使用的是StreamingListener,基本上满足需求
- 程序出错报警
- 数据挤压报警
- 数据量异常报警
2、任务端口存活情况报警
spark-submit提交任务的时候指定端口号,–conf spark.ui.port=4083,corntab定时ping就可以了。
源码
package org.apache.spark.streaming.scheduler
@DeveloperApi
trait StreamingListener {
/** Called when the streaming has been started */
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) {
}
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
}
/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) {
}
/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
}
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
}
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
}
/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
}
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) {
}
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) {
}
}
使用
import java.text.SimpleDateFormat
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.scheduler._
import org.slf4j.LoggerFactory
import my.nexus.util.Sendmail
/**
* @author liuchunlong
* @mail [email protected]
* @description 数据积压报警机制
* @data 2019-08-26
*/
class SparkListener(private val appName: String, private val duration: Int) extends StreamingListener {
private val logger = LoggerFactory.getLogger(this.getClass.getName)
val TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
/**
* 批次提交时监控
*
* @param batchSubmitted
*/
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
super.onBatchSubmitted(batchSubmitted)
val batchInfo = batchSubmitted.batchInfo
//开始时间
val batchTime = batchInfo.batchTime.milliseconds
val numRecords = batchInfo.numRecords
//获取数据量异常
if (numRecords == 0) {
val monitorTitle = s"SparkString $appName 批次数据量异常警告"
val monitorContent = s"processingStartTime -> ${TIME_FORMAT.format(batchTime)}, <br>" +
s"numRecords -> ${numRecords}r 请及时检查"
//TODO报警
}
logger.info("BJJListener batchTime : " + TIME_FORMAT.format(batchTime))
}
/**
* 任务开始时监控
*
* @param batchStarted
*/
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
val batchInfo = batchStarted.batchInfo
val processingStartTime = batchInfo.processingStartTime.get
logger.info("BJJListener processingStartTime : " + TIME_FORMAT.format(processingStartTime))
}
/**
* 批次结束后监控
*
* @param batchCompleted
*/
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
//此批次开始时间
val processingStartTime = batchInfo.processingStartTime
//此批次结束时间
val processingEndTime = batchInfo.processingEndTime
//此批次调度时间 = 开始时间-提交时间
val schedulingDelay = batchInfo.schedulingDelay
//此批次运行时间 = 开始时间-结束时间
val processingDelay = batchInfo.processingDelay
//此批次总耗费时间 = 调度时间+运行时间
val totalDelay = batchInfo.totalDelay
//批次时间
val batchTime = batchInfo.batchTime
//此批次获取的数据条数
val numRecords = batchInfo.numRecords
val outputOperationInfos: Map[Int, OutputOperationInfo] = batchInfo.outputOperationInfos
var flag = false
var errorMessage = "报错信息:<br>"
outputOperationInfos.keys.foreach(index=>{
val failureReason = outputOperationInfos(index).failureReason
if(failureReason.isDefined){
flag = true
errorMessage += index+":<br>"+failureReason.get+"<br><br>"
}
})
//发生报错
if(flag){
val monitorTitle = s"SparkString $appName 程序报错异常警告"
var monitorContent = s"基本信息:<br>processingStartTime -> ${TIME_FORMAT.format(processingStartTime.get)} <br>" +
s"processingEndTime -> ${TIME_FORMAT.format(processingEndTime.get)} <br>" +
s"processingDelay -> ${processingDelay.get / 1000}s <br>" +
s"totalDelay -> ${totalDelay.get / 1000}s <br>" +
s"batchTime -> ${batchTime}s <br>"+
s"numRecords -> ${numRecords}r <br>"+
s"偏移量信息: <br>"
batchInfo.streamIdToInputInfo.foreach(tuple => {
val offsets = tuple._2.metadata("offsets")
classOf[List[OffsetRange]].cast(offsets).foreach(offsetRange => {
val partition = offsetRange.partition
val minOffset = offsetRange.fromOffset
val maxOffset = offsetRange.untilOffset
val topicName = offsetRange.topic
monitorContent += s"topic: $topicName partition: $partition offsets: $minOffset to $maxOffset<br>"
})
monitorContent += errorMessage
monitorContent += "请及时检查!!!"
//TODO报警
})
}
//发生数据积压
if (totalDelay.get >= duration * 1000 && totalDelay.get <= (duration+60) * 1000) {
val monitorTitle = s"SparkString $appName 程序阻塞异常警告"
var monitorContent = s"基本信息:<br>processingStartTime -> ${TIME_FORMAT.format(processingStartTime.get)} <br>" +
s"processingEndTime -> ${TIME_FORMAT.format(processingEndTime.get)} <br>" +
s"processingDelay -> ${processingDelay.get / 1000}s <br>" +
s"totalDelay -> ${totalDelay.get / 1000}s <br>" +
s"batchTime -> ${batchTime}s <br>"+
s"numRecords -> ${numRecords}r <br>"+
s"偏移量信息: <br>"
batchInfo.streamIdToInputInfo.foreach(tuple => {
val offsets = tuple._2.metadata("offsets")
classOf[List[OffsetRange]].cast(offsets).foreach(offsetRange => {
val partition = offsetRange.partition
val minOffset = offsetRange.fromOffset
val maxOffset = offsetRange.untilOffset
val topicName = offsetRange.topic
monitorContent += s"topic: $topicName partition: $partition offsets: $minOffset to $maxOffset<br>"
})
monitorContent += "请及时检查!!!"
//TODO报警
})
}
}
}
调用
ssc.addStreamingListener(new SparkListener("snkrs-market-huo", this.batch*5))