文章目录
背景
默认情况下Spring Websocket没有任何的metrics数据,所以我整理了下需要监控的指标以便于能反馈Websocket运行状况。
RED原则,Request: 请求量,Error:错误量,Duration:耗时
期望指标:
- 连接
- websocket当前连接数
- 每个response code 连接数 (101 401 403 429 500) 可观测性-Metrics-接口监控(Mvc监控)
- 连接成功/失败数 可观测性-Metrics-接口监控(Mvc监控)
- 连接成功/失败耗时 可观测性-Metrics-接口监控(Mvc监控)
- 连接关闭close原因数(1000,1001,1002…)
- 连接持续时长 lifespan
- 消息
- 消息读写成功/失败数量(按照消息类型)
- 消息读写耗时(按照消息类型)
- 消息体大小 (请求/响应)(按照消息类型)
Micrometer
先学习下Micrometer概念和基础示例
官网文档:
Micrometer是一个用于应用程序性能监控(Application Performance Monitoring,APM)的指标度量库。它提供了一组简单而强大的API,用于测量应用程序的各种指标,例如计时器(Timer)、计数器(Counter)、测量仪(Gauge
)、分布摘要(DistributionSummary)、长任务计时器(LongTaskTimer)、函数计数器(FunctionCounter)、函数计时器(FunctionTimer)和时间测量仪(TimeGauge)。
下面是Micrometer的各个组件的介绍:
Timer(计时器)
- 介绍:Timer用于测量一段代码块的执行时间,并提供统计信息,如平均执行时间、最大执行时间和执行时间的分布等。
- 适用场景:适用于度量方法的调用时间、HTTP请求的响应时间等。
测量HTTP请求响应时间示例:
Timer timer = registry.timer("my.timer");
Timer.Sample sample = Timer.start(registry);
// 模拟HTTP请求
HttpResponse response = sendHttpRequest();
// ...
sample.stop(timer);
// 获取统计结果
long count = timer.count(); // 执行次数 记录数
double totalTime = timer.totalTime(); // 总执行时间
double meanTime = timer.mean(TimeUnit.MILLISECONDS); // 平均执行时间
double maxTime = timer.max(TimeUnit.MILLISECONDS); // 最大执行时间
// 方式二
Timer timer = Timer
.builder("my.timer")
.description("a description of what this timer does") // optional
.tags("region", "test") // optional
.register(registry);
Counter(计数器)
- 介绍:Counter用于计数事件的发生次数,并提供累计计数的统计信息。
- 适用场景:适用于度量系统中的事件数量,如消息队列中的消息数、错误发生次数等。
用于累积值只能增加不能减少
计数错误发生次数示例:
MeterRegistry registry = new SimpleMeterRegistry();
Counter counter = registry.counter("error.counter");
// 在错误发生时递增计数器
if (errorOccurred) {
counter.increment();
}
// 获取统计结果
double count = counter.count(); // 错误发生次数
// 写法二
Counter counter = Counter
.builder("counter")
.baseUnit("beans") // optional
.description("a description of what this counter does") // optional
.tags("region", "test") // optional
.register(registry);
Gauge(测量仪)
- 介绍:Gauge用于测量一个可变值,例如内存使用量、队列长度等。Gauge的值可以在任何时间点进行读取。
- 适用场景:适用于测量应用程序中的实时状态和指标,如内存使用量、队列长度等。
用于测量有活动范围的值,可以增也可以减
测量队列长度示例:
Gauge gauge = Gauge.builder("queue.gauge", () -> getQueueSize()).register(registry);
// 获取统计结果
double value = gauge.value(); // 当前队列长度
// 其他示例
List<String> list = registry.gauge("listGauge", Collections.emptyList(), new ArrayList<>(), List::size); //监视非数值对象
List<String> list2 = registry.gaugeCollectionSize("listSize2", Tags.empty(), new ArrayList<>()); //监视集合大小
Map<String, Integer> map = registry.gaugeMapSize("mapGauge", Tags.empty(), new HashMap<>());
// 还可以手动加减Gauge
AtomicInteger n = registry.gauge("numberGauge", new AtomicInteger(0));
n.set(1);
n.set(2);
DistributionSummary(分布摘要)
- 介绍:DistributionSummary用于计算一组值的分布统计信息,如平均值、最大值、最小值和百分位数等。
- 适用场景:适用于度量一组值的分布情况,如请求响应时间、响应大小等。
测量响应大小分布示例:
DistributionSummary summary = DistributionSummary.builder("size.summary").register(registry);
summary.record(1024); // 记录响应大小
summary.record(2048); // 记录响应大小
summary.record(512); // 记录响应大小
// 获取统计结果
long count = summary.count(); // 执行次数 记录数
double totalAmount = summary.totalAmount(); // 总和
double max = summary.max(); // 最大值
double mean = summary.mean(); // 平均值
LongTaskTimer(长任务计时器)
-
介绍:LongTaskTimer用于测量长时间运行的任务的执行时间,并提供统计信息,如活动任务数量和任务执行时间。
-
适用场景:适用于度量长时间运行的任务的执行时间,如批处理作业。
测量后台任务的执行时间:
LongTaskTimer longTaskTimer = registry.more().longTaskTimer("backgroundTask.timer");
LongTaskTimer.Sample sample = longTaskTimer.start();
// 执行后台任务
executeBackgroundTask();
sample.stop();
// 获取统计结果
long activeTasks = longTaskTimer.activeTasks(); // 活动任务数量
double totalTime = longTaskTimer.totalTime(); // 总执行时间
FunctionCounter(函数计数器)
- 介绍:FunctionCounter用于计算带有自定义函数的计数器,根据函数的返回值进行计数。
- 适用场景:适用于度量满足特定条件的事件数量。
计数处理成功的请求次数:
FunctionCounter counter = FunctionCounter
.builder("success.counter", this, obj -> obj.getSuccessCount())
.register(registry);
// 获取统计结果
double count = counter.count(); // 成功请求次数
FunctionTimer(函数计时器)
- 介绍:FunctionTimer用于测量带有自定义函数的代码块的执行时间,并提供统计信息,如执行次数和平均执行时间。
- 适用场景:适用于度量带有自定义函数的代码块的执行时间。
测量自定义函数处理请求的执行时间示例:
FunctionTimer timer = FunctionTimer
.builder("request.timer", this, obj -> obj.getRequestExecutionTime())
.register(registry);
// 获取统计结果
long count = timer.count(); // 执行次数
double mean = timer.mean(TimeUnit.MILLISECONDS); // 平均执行时间
TimeGauge(时间测量仪)
- 介绍:TimeGauge用于测量一个可变值,并将其解释为时间单位,例如测量任务的执行时间。
- 适用场景:适用于度量一个可变值,并将其解释为时间单位的场景。
测量请求的处理时间示例:
TimeGauge timeGauge = TimeGauge
.builder("request.timeGauge", () -> getRequestProcessingTime(), TimeUnit.MILLISECONDS)
.register(registry);
// 获取统计结果
double value = timeGauge.value(); // 当前值
总结
下面是关于每个组件的使用场景和建议:
组件 | 使用场景 | 建议 |
---|---|---|
Timer | 测量方法执行时间、请求响应时间等 | 用于度量需要精确测量执行时间的代码块,可以获取平均执行时间和执行时间分布等统计信息。 |
Counter | 计数事件发生次数、错误发生次数等,用于累积值只能增加不能减少 | 用于度量事件发生的次数,提供累计计数的统计信息。 |
Gauge | 测量实时状态和指标,如内存使用量、队列长度等 | 用于度量可变值的当前状态,适用于实时监测和跟踪指标。 |
DistributionSummary | 测量一组值的分布情况,如请求响应时间、响应大小等 | 用于度量一组值的分布情况,可以获取最大值、最小值、平均值和百分位数等统计信息。 |
LongTaskTimer | 测量长时间运行的任务的执行时间,如批处理作业等 | 用于度量长时间运行的任务的执行时间,可以获取活动任务数量和总执行时间等统计信息。 |
FunctionCounter | 计数满足特定条件的事件数量 | 用于度量满足特定条件的事件数量,根据自定义函数的返回值进行计数。 |
FunctionTimer | 测量带有自定义函数的代码块的执行时间 | 用于度量带有自定义函数的代码块的执行时间,可以获取执行次数和平均执行时间等统计信息。 |
TimeGauge | 测量一个可变值并将其解释为时间单位,如任务的执行时间等 | 用于度量一个可变值,并将其解释为时间单位,适用于测量执行时间等场景。 |
这些使用场景和建议将帮助您选择适合您需求的Micrometer组件,并根据具体情况设置和使用各个组件以获取所需的统计信息。
代码实现
配置类
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication
public class WebsocketMetricsAutoConfiguration {
@Bean
public WebsocketMetrics websocketMetrics(MeterRegistry meterRegistry) {
return new WebsocketMetrics(meterRegistry);
}
/**
* 示例:用于将指标数据记录到日志中。
*/
@Bean
public MeterRegistry meterRegistry() {
return new LoggingMeterRegistry(new LoggingRegistryConfig() {
@Override
public String get(String key) {
return null;
}
@Override
public Duration step() {
return Duration.ofSeconds(60);
}
}, Clock.SYSTEM);
}
}
指标类
public class WebsocketMetrics {
private final MeterRegistry meterRegistry;
private final Timer lifeTimeTimer;
private final Counter websocketOpenCounter;
private final Counter transportErrorCounter;
public WebsocketMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化一些 tag 值是死的埋点
lifeTimeTimer = Timer.builder("connection.lifetime").tag("tag1", "value1").tag("tag2", "value1").register(meterRegistry);
websocketOpenCounter = Counter.builder("connection.open.count").tag("tag1", "value1").tag("tag2", "value1").register(meterRegistry);
transportErrorCounter = Counter.builder("connection.transport.error.count").tag("tag1", "value1").register(meterRegistry);
}
public void websocketOpenCounter() {
websocketOpenCounter.increment(1);
}
public void websocketGauge(Map webSocketSessionMap) {
Gauge.builder("connection.current.gauge", () -> webSocketSessionMap.size()).register(meterRegistry);
}
/**
* tag 值是活的可以这样埋点
*/
public void webSocketCloseConnectionCounter(long count, int closeCode) {
Counter webSocketCloseConnectionCounter = Counter.builder("connection.close.count")
.tag("code", closeCode + "")
.register(meterRegistry);
webSocketCloseConnectionCounter.increment(count);
}
public void messageProcessTimer(long nanoDuration, boolean success, String messageType) {
Timer wsOpenFailureTimer = Timer.builder(MESSAGE_PROCESS_TIMER)
.tag("result", success + "")
.tag("action", messageType)
.register(meterRegistry);
wsOpenFailureTimer.record(nanoDuration, TimeUnit.NANOSECONDS);
}
}
指标触发
@Service
@Slf4j
public class LakerChatHandler extends AbstractWebSocketHandler {
private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
WebsocketMetrics websocketMetrics;
public LakerChatHandler(WebsocketMetrics websocketMetrics) {
this.websocketMetrics = websocketMetrics;
websocketMetrics.websocketGauge(webSocketSessionMap);
}
/**
* 成功连接时
*
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
webSocketSessionMap.put(session.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
websocketMetrics.websocketOpenCounter();
}
/**
* 处理textmessage
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String exception = "";
long start = System.nanoTime();
try {
// 业务处理
} catch (Exception e) {
exception = e.getClass().getSimpleName();
throw e;
} finally {
websocketMetrics.websocketProcessTimer(System.nanoTime() - start, exception);
}
}
/**
* 连接关闭时
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
websocketMetrics.webSocketCloseConnectionCounter(1, status.getCode());
WebSocketSession concurrentSession = webSocketSessionMap.remove(session.getId());
if (concurrentSession.isOpen()) {
try {
concurrentSession.close();
} catch (Exception ex) {
// ignore
}
}
}
结果日志
测试日志如下:
connection.close.count{code=1006} throughput=0.05/s
connection.open.count{tag1=value1,tag2=value1} throughput=0.05/s
connection.current.gauge{} value=2
connection.message.process{exception=,tag1=value1} throughput=0.033333/s mean=3.25064095s max=6.4944218s
http.server.requests{exception=None,method=GET,outcome=CLIENT_ERROR,status=404,uri=/**} throughput=0.016667/s mean=0.009702s max=0.009702s
http.server.requests{exception=None,method=GET,outcome=SUCCESS,status=200,uri=/captcha} throughput=0.016667/s mean=0.4792633s max=0.4792633s
http.server.requests{exception=None,method=GET,outcome=INFORMATIONAL,status=101,uri=/websocket/**} throughput=0.033333/s mean=0.01526635s max=4.35s