背景
业务团队希望对消费者的健康状态有感知,在消费发生延迟时能收到告警通知,可以通过RocketMQ提供的的SDK:MQAdminExt写一个定时的程序,扫描消费者的状态,并通过比较配置的阈值,来判断是否需要发告警通知,还有一种方法就是直接使用RocketMQ官方提供的console来做,只不过该功能需要动手打开。
需求
不同的消费组可以配置不同的告警规则,并且可以配置邮箱,进行邮件发送,邮件内容包含topic、消费者、异常信息,经过查看源码,发现开源的console将告警的功能注释了,并且具体的告警通知逻辑需要自己写,还有就是不支持邮箱配置,所以,开始改吧。
构建源码
地址:https://github.com/apache/rocketmq-externals.git
下载后,使用IDEA打开rocketmq-console子模块
修改源码
打开页面按钮
这一步是打开页面隐藏的配置告警的按钮
位置:rocketmq-console/src/main/resources/static/view/pages/consumer.html 第81行
这个地方默认是注释的,需要打开,为了风格统一,我把class也换了
<button name="client" ng-click="monitor(consumerGroup.group)"
class="btn btn-raised btn-sm btn-primary" type="button">Monitor Config
</button>
效果如下:
弹出框:
修改弹出框
这一步主要是添加一栏邮箱输入框
位置:rocketmq-console/src/main/resources/static/view/pages/consumer.html 321行
替换为如下:
<div class="form-group">
<label class="control-label col-sm-4">最小客户端数:</label>
<div class="col-sm-8">
<input class="form-control" ng-model="ngDialogData.data.minCount" type="text"
required/>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-4">最大积压数:</label>
<div class="col-sm-8">
<input class="form-control" ng-model="ngDialogData.data.maxDiffTotal" type="text"
required/>
</div>
</div>
<div class="form-group">
<label class="control-label col-sm-4">邮箱(多个用英文逗号隔开):</label>
<div class="col-sm-8">
<input class="form-control" ng-model="ngDialogData.data.email" type="text"
required/>
</div>
</div>
效果如下:
修改js传参
这一步主要是添加请求参数:email
位置:rocketmq-console/src/main/resources/static/src/consumer.js 第283行
修改为:
修改接口方法
之前的步骤就完成了页面所有修改,这一步主要是修改接口方法,添加email参数
位置:org.apache.rocketmq.console.model.ConsumerMonitorConfig
直接替换为以下内容:
package org.apache.rocketmq.console.model;
public class ConsumerMonitorConfig {
/**
* 当前消费分组的机器数量最小阈值,低于此值将会告警
*/
private Integer minCount;
/**
* 当前消费分组允许的最大消息堆积量,高于此值将会告警
*/
private Integer maxDiffTotal;
/**
* 告警邮件发送邮箱
*/
private String email;
public ConsumerMonitorConfig() {
}
public ConsumerMonitorConfig(int minCount, int maxDiffTotal, String email) {
this.minCount = minCount;
this.maxDiffTotal = maxDiffTotal;
this.email = email;
}
public int getMinCount() {
return minCount;
}
public void setMinCount(int minCount) {
this.minCount = minCount;
}
public int getMaxDiffTotal() {
return maxDiffTotal;
}
public void setMaxDiffTotal(int maxDiffTotal) {
this.maxDiffTotal = maxDiffTotal;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}
修改Controller方法
位置:org.apache.rocketmq.console.controller.MonitorController#createOrUpdateConsumerMonitor
替换为:
@RequestMapping(value = "/createOrUpdateConsumerMonitor.do", method = {
RequestMethod.POST})
@ResponseBody
public Object createOrUpdateConsumerMonitor(@RequestParam String consumeGroupName, @RequestParam(required = false) Integer minCount,
@RequestParam(required = false) Integer maxDiffTotal, @RequestParam(required = false) String email) {
ConsumerMonitorConfig config = null;
if (Objects.nonNull(minCount) && Objects.nonNull(maxDiffTotal) && StringUtils.isNotBlank(email)) {
config = new ConsumerMonitorConfig(minCount, maxDiffTotal, email);
}
return monitorService.createOrUpdateConsumerMonitor(consumeGroupName, config);
}
如果三个参数有一个没传,则代表删除告警配置
修改service方法
位置:org.apache.rocketmq.console.service.impl.MonitorServiceImpl#createOrUpdateConsumerMonitor
替换为:
@Override
public boolean createOrUpdateConsumerMonitor(String name, ConsumerMonitorConfig config) {
if (Objects.isNull(config)) {
configMap.remove(name);
} else {
configMap.put(name, config);
}
writeToFile(getConsumerMonitorConfigDataPath(), configMap);
return true;
}
编写告警逻辑
告警实体类添加topic字段
原来的告警信息是没有Topic字段的,需要自己封装一下
位置:org.apache.rocketmq.console.model.GroupConsumeInfo
封装Topic字段
位置:org.apache.rocketmq.console.service.impl.ConsumerServiceImpl#queryGroup
代码:
//获取消费的topic
String topic = consumeStats.getOffsetTable().keySet().iterator().next().getTopic();
groupConsumeInfo.setTopic(topic);
pom引入邮件依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
<version>${spring.boot.version}</version>
</dependency>
application.properties配置邮件服务信息
spring.mail.default-encoding = UTF-8
spring.mail.host = XXXX
spring.mail.port = 25
spring.mail.username = XXXX
spring.mail.password = XXXX
spring.mail.protocol = smtp
spring.mail.properties.mail.smtp.auth = true
spring.mail.properties.mail.smtp.ssl.enable = false
告警Task逻辑
位置:org.apache.rocketmq.console.task.MonitorTask
代码内容:
package org.apache.rocketmq.console.task;
import org.apache.commons.codec.CharEncoding;
import org.apache.rocketmq.console.model.ConsumerMonitorConfig;
import org.apache.rocketmq.console.model.GroupConsumeInfo;
import org.apache.rocketmq.console.service.ConsumerService;
import org.apache.rocketmq.console.service.MonitorService;
import org.apache.rocketmq.console.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.mail.internet.MimeMessage;
import java.util.Map;
/**
* 告警Task
*
* @author zhurunhua
* @date 5/20/21 6:31 PM
*/
@Component
public class MonitorTask {
private static final String TITTLE = "【RocketMQ】消费者异常告警";
private Logger logger = LoggerFactory.getLogger(MonitorTask.class);
@Resource
private MonitorService monitorService;
@Resource
private ConsumerService consumerService;
@Resource
private JavaMailSender sender;
@Scheduled(cron = "0 */2 * * * ?")
public void scanProblemConsumeGroup() {
for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) {
GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey());
if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
logger.info("【Monitor】consumer exception:{}", JsonUtil.obj2String(consumeInfo));
//发邮件
sendMail(configEntry, consumeInfo);
}
}
}
private void sendMail(Map.Entry<String, ConsumerMonitorConfig> configEntry, GroupConsumeInfo consumeInfo) {
String email = configEntry.getValue().getEmail().trim();
String[] to;
if (email.contains(",")) {
to = email.split(",");
} else {
to = new String[]{
email};
}
String content = getEmailContent(configEntry, consumeInfo);
sendMail(TITTLE, content, to, null, false);
}
public void sendMail(final String title, final String content, final String[] to, final String[] cc,
final boolean isHtml) {
try {
MimeMessage message = sender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(message, true, CharEncoding.UTF_8);
//你的发件人邮箱
helper.setFrom("");
helper.setTo(to);
if (cc != null && cc.length > 0) {
helper.setCc(cc);
}
helper.setSubject(title);
helper.setText(content, isHtml);
sender.send(message);
} catch (Exception e) {
logger.error("send mail error", e);
}
}
private String getEmailContent(Map.Entry<String, ConsumerMonitorConfig> configEntry, GroupConsumeInfo consumeInfo) {
StringBuilder content = new StringBuilder();
content.append("Topic:");
content.append(consumeInfo.getTopic());
content.append("\n");
content.append("消费组:");
content.append(consumeInfo.getGroup());
content.append("\n");
content.append("消费者数量:");
content.append(consumeInfo.getCount());
content.append(" (阈值为");
content.append(configEntry.getValue().getMinCount());
content.append(")\n");
content.append("消费TPS:");
content.append(consumeInfo.getConsumeTps());
content.append("\n");
content.append("消费积压数:");
content.append(consumeInfo.getDiffTotal());
content.append(" (阈值为");
content.append(configEntry.getValue().getMaxDiffTotal());
content.append(")\n");
return content.toString();
}
}
测试
配置:
两分钟后收到邮件:
大功告成!!!