package heartbeat.monitor; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.commons.io.IOUtils; import heartbeat.monitor.sendmessage.SendMailUtil; import heartbeat.monitor.util.HeartBeatMonitorConstant; /** * @author Switching * @version 1.0, 2017-01-01 * @since heartbeat.monitor 1.0 */ public class HeartBeatMonitorAccess { private List<HeartBeatMonitor> consumers; private static String heartbeatConsumerProperties; public HeartBeatMonitorAccess() throws IOException { Properties properties = new Properties(); if (heartbeatConsumerProperties == null) { properties.load(ClassLoader.getSystemResourceAsStream("heartBeatMonitor.properties")); } else { String propurl = heartbeatConsumerProperties; InputStream in = new FileInputStream(propurl); properties.load(in); IOUtils.closeQuietly(in); } int num = 1; String topic = properties.getProperty(HeartBeatMonitorConstant.MONITOR_TOPIC); consumers = new ArrayList<HeartBeatMonitor>(num); for (int i = 0; i < num; i++) { consumers.add(new HeartBeatMonitor(properties, topic)); new SendMailUtil(properties, topic); } } public void exeute() throws InterruptedException, ExecutionException { for (HeartBeatMonitor consumer : consumers) { new Thread(consumer).start(); } } public static void main(String[] args) throws Exception { if (args.length > 0) heartbeatConsumerProperties = args[0]; HeartBeatMonitorAccess consumerGroup = new HeartBeatMonitorAccess(); consumerGroup.exeute(); } }
消费者抓取相应的规则进行解析判断
后台轮询消费kafka中消息
package heartbeat.monitor; import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.codehaus.jackson.map.ObjectMapper; import heartbeat.monitor.sendmessage.SendMailUtil; import heartbeat.monitor.util.HeartBeatMonitorConstant; import heartbeat.monitor.util.MessageUtil; /** * @author Switching * @version 1.0, 2017-01-01 * @since heartbeat.monitor 1.0 */ public class HeartBeatMonitor implements Runnable { private final KafkaConsumer<String, String> consumer; public static String lastMonitorTime; public static String lastMonitorInfo; public static DateFormat dfMonitorTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private long timeMonitor = 20000; private long maxTriesTime = 3; private String monitorKeyWord = "FILE_WRITTEN_CLOSED"; private String monitorLogLvls = "EXCEPTION"; public HeartBeatMonitor(Properties properties, String topic) throws IOException { if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_OFFSET_TIME))) timeMonitor = Long.parseLong(properties.getProperty(HeartBeatMonitorConstant.MONITOR_OFFSET_TIME)); if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_TRIES))) maxTriesTime = Long.parseLong(properties.getProperty(HeartBeatMonitorConstant.MONITOR_TRIES)); if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_KEY_WORD))) monitorKeyWord = properties.getProperty(HeartBeatMonitorConstant.MONITOR_KEY_WORD); if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_LOG_LVLS))) monitorLogLvls = properties.getProperty(HeartBeatMonitorConstant.MONITOR_LOG_LVLS); consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList(topic)); } public void close() { consumer.close(); } public void run() { long triesTime = maxTriesTime - 1; System.out.println("Heartbeat Monitor Start!"); if (lastMonitorTime == null) { resetTime(); } String name = Thread.currentThread().getName(); while (true) { Date dtMonitor; Date dtLocal = new Date(); ConsumerRecords<String, String> records = consumer.poll(2); for (ConsumerRecord<String, String> record : records) { ObjectMapper objectMapper = new ObjectMapper(); try { Map<String, String> recordMap = objectMapper.readValue(record.value(), Map.class); if (recordMap.get(HeartBeatMonitorConstant.FILE_EVENT).toString().equals(monitorKeyWord)) { lastMonitorTime = recordMap.get(HeartBeatMonitorConstant.FILE_TIME).toString(); lastMonitorInfo = recordMap.get(HeartBeatMonitorConstant.SFTP_HOST_NAME) + ":" + recordMap.get(HeartBeatMonitorConstant.SFTP_HOST_IP); } } catch (Exception e) { e.printStackTrace(); } System.out.println(name + "---" + record.partition() + ":" + record.offset() + " = " + record.key() + ":" + record.value()); } try { dtMonitor = dfMonitorTime.parse(lastMonitorTime); if ((dtLocal.getTime() - timeMonitor) > dtMonitor.getTime()) { if (triesTime > 0) { triesTime--; } else { resetTime(); triesTime = maxTriesTime - 1; } if (HeartBeatMonitorConstant.LOG_LVL_EXCEP.equals(monitorLogLvls) || HeartBeatMonitorConstant.LOG_LVL_ALL.equals(monitorLogLvls)) { String content = "[" + dtLocal + "] " + lastMonitorInfo + " Exception! manual intervention please! " + (triesTime + 1) + " times! \n\t\t\t\tNow(" + dtLocal + ") is beyond " + timeMonitor / 1000 + "(s) lastMonitorTime(" + dtMonitor + ")."; System.out.println(content); if (triesTime == 0) { SendMailUtil.sendMailAccess(lastMonitorInfo, content); } } } else { if (HeartBeatMonitorConstant.LOG_LVL_NOEXCEP.equals(monitorLogLvls) || HeartBeatMonitorConstant.LOG_LVL_ALL.equals(monitorLogLvls)) { System.out.println("[" + dtLocal + "] " + name + "NoException! \n\t\t\t\tNow(" + dtLocal + ") is Range " + timeMonitor / 1000 + " (s) lastMonitorTime(" + dtMonitor + ")."); } } } catch (ParseException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } public void resetTime() { lastMonitorTime = dfMonitorTime.format(new Date()); if (lastMonitorInfo == null) { lastMonitorInfo = "Monitor"; } } }
通过配置文件形式,进行各种事件的监控
#Created by Switching auto.commit.interval.ms=1000 auto.offset.reset=earliest bootstrap.servers=192.168.102.10\:9092 enable.auto.commit=true group.id=monitor1 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer monitorTopic=monitor value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #heartBeat monitor exception notify times(3) monitorTries=3 #heartBeat monitor offset time(20000)(ms) monitorOffsetTime=20000 #heartBeat monitor log lvls(EXCEPTION):ALL|EXCEPTION|NOEXCEPTION|NONE monitorLogLvls=EXCEPTION #heartBeat Monitor mailInfo [email protected] sendEmailPassword=xxxxx sendEmailSMTPHost=smtp.xxxx.com [email protected]