1、初始化AlertStatusReporter(每5s上报一次)
AlertStatusReporter.init
2、查询要上报的alert
def run(self):
"""
Run an endless loop which reports all the alert statuses got from collector
"""
if self.alert_reports_interval == 0:
logger.warn("AlertStatusReporter is turned off. Some functionality might not work correctly.")
return
logger.info('=====gaofeng=====AlertStatusReporter===alert_definitions_cache=%s',self.alert_definitions_cache)
logger.info('=====gaofeng=====AlertStatusReporter===reported_alerts=%s',self.reported_alerts)
logger.info('=====gaofeng=====AlertStatusReporter===send_alert_changes_only=%s',self.send_alert_changes_only)
while not self.stop_event.is_set():
try:
logger.info('=====gaofeng=====AlertStatusReporter===initializer_module.is_registered=%s', self.initializer_module.is_registered)
if self.initializer_module.is_registered:
# 需要这样做来删除删除的集群信息(例如ambaria -server reset)
self.clean_not_existing_clusters_info()
#获取自上次使用此方法以来收集的所有警报
#调用。此方法将清除收集的警报。
alerts = self.collector.alerts()
logger.info('=====gaofeng=====AlertStatusReporter===alerts=%s', alerts)
# 保存通过的所有警报的最后运行时间
self.stale_alerts_monitor.save_executed_alerts(alerts)
#get_changed_alerts:获取自上次成功向服务器报告后更改的警告报告
#三目运算符
alerts_to_send = self.get_changed_alerts(alerts) if self.send_alert_changes_only else alerts
logger.info('=====gaofeng=====AlertStatusReporter===alerts_to_send=%s', alerts_to_send)
if alerts_to_send and self.initializer_module.is_registered:
#ALERTS_STATUS_REPORTS_ENDPOINT=/reports/alerts_status
correlation_id = self.initializer_module.connection.send(message=alerts_to_send, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)
self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_results(alerts_to_send)
except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue
pass
except:
logger.exception("Exception in AlertStatusReporter. Re-running it")
self.stop_event.wait(self.alert_reports_interval)
logger.info("AlertStatusReporter has successfully finished")
其中
2.1、获取alerts
获取自上次使用此方法以来收集的所有警报
alerts = self.collector.alerts()
2.2、决定alerts_to_send
获取自上次成功向服务器报告后更改的警告报告get_changed_alerts
,如果没有则就拿alert作为要上报的alerts_to_send
alerts_to_send = self.get_changed_alerts(alerts) if self.send_alert_changes_only else alerts
3、发送给ambari-server
对应的代码在ambari-server的/reports/alerts_status目录下(依据注解去寻找)
#ALERTS_STATUS_REPORTS_ENDPOINT=/reports/alerts_status
orrelation_id = self.initializer_module.connection.send(message=alerts_to_send, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT, log_message_function=AlertStatusReporter.log_sending)