可观测性-Metrics-Spring Websocket监控

背景

默认情况下Spring Websocket没有任何的metrics数据,所以我整理了下需要监控的指标以便于能反馈Websocket运行状况。

RED原则,Request: 请求量,Error:错误量,Duration:耗时

期望指标

Micrometer

先学习下Micrometer概念和基础示例

官网文档:

Micrometer是一个用于应用程序性能监控(Application Performance Monitoring,APM)的指标度量库。它提供了一组简单而强大的API,用于测量应用程序的各种指标,例如计时器(Timer)、计数器(Counter)、测量仪(Gauge)、分布摘要(DistributionSummary)、长任务计时器(LongTaskTimer)、函数计数器(FunctionCounter)、函数计时器(FunctionTimer)和时间测量仪(TimeGauge)。

下面是Micrometer的各个组件的介绍:

Timer(计时器)

  • 介绍:Timer用于测量一段代码块的执行时间,并提供统计信息,如平均执行时间、最大执行时间和执行时间的分布等。
  • 适用场景:适用于度量方法的调用时间、HTTP请求的响应时间等。

测量HTTP请求响应时间示例

Timer timer = registry.timer("my.timer");
Timer.Sample sample = Timer.start(registry);
	// 模拟HTTP请求
	HttpResponse response = sendHttpRequest();
	// ...
sample.stop(timer);
// 获取统计结果
long count = timer.count(); // 执行次数 记录数
double totalTime = timer.totalTime(); // 总执行时间
double meanTime = timer.mean(TimeUnit.MILLISECONDS); // 平均执行时间
double maxTime = timer.max(TimeUnit.MILLISECONDS); // 最大执行时间

// 方式二
Timer timer = Timer
     .builder("my.timer")
     .description("a description of what this timer does") // optional
     .tags("region", "test") // optional
     .register(registry); 

Counter(计数器)

  • 介绍:Counter用于计数事件的发生次数,并提供累计计数的统计信息。
  • 适用场景:适用于度量系统中的事件数量,如消息队列中的消息数、错误发生次数等。

用于累积值只能增加不能减少

计数错误发生次数示例

MeterRegistry registry = new SimpleMeterRegistry();
Counter counter = registry.counter("error.counter");
// 在错误发生时递增计数器
if (errorOccurred) {
    
    
    counter.increment();
}

// 获取统计结果
double count = counter.count(); // 错误发生次数

//  写法二
Counter counter = Counter
    .builder("counter")
    .baseUnit("beans") // optional
    .description("a description of what this counter does") // optional
    .tags("region", "test") // optional
    .register(registry);

Gauge(测量仪)

  • 介绍:Gauge用于测量一个可变值,例如内存使用量、队列长度等。Gauge的值可以在任何时间点进行读取。
  • 适用场景:适用于测量应用程序中的实时状态和指标,如内存使用量、队列长度等。

用于测量有活动范围的值,可以增也可以减

测量队列长度示例

Gauge gauge = Gauge.builder("queue.gauge", () -> getQueueSize()).register(registry);

// 获取统计结果
double value = gauge.value(); // 当前队列长度

// 其他示例
List<String> list = registry.gauge("listGauge", Collections.emptyList(), new ArrayList<>(), List::size); //监视非数值对象
List<String> list2 = registry.gaugeCollectionSize("listSize2", Tags.empty(), new ArrayList<>()); //监视集合大小
Map<String, Integer> map = registry.gaugeMapSize("mapGauge", Tags.empty(), new HashMap<>()); 
// 还可以手动加减Gauge
AtomicInteger n = registry.gauge("numberGauge", new AtomicInteger(0));
n.set(1);
n.set(2);

DistributionSummary(分布摘要)

  • 介绍:DistributionSummary用于计算一组值的分布统计信息,如平均值、最大值、最小值和百分位数等。
  • 适用场景:适用于度量一组值的分布情况,如请求响应时间、响应大小等。

测量响应大小分布示例

DistributionSummary summary = DistributionSummary.builder("size.summary").register(registry);
summary.record(1024); // 记录响应大小
summary.record(2048); // 记录响应大小
summary.record(512); // 记录响应大小

// 获取统计结果
long count = summary.count(); // 执行次数 记录数
double totalAmount = summary.totalAmount(); // 总和
double max = summary.max(); // 最大值
double mean = summary.mean(); // 平均值

LongTaskTimer(长任务计时器)

  • 介绍:LongTaskTimer用于测量长时间运行的任务的执行时间,并提供统计信息,如活动任务数量和任务执行时间。

  • 适用场景:适用于度量长时间运行的任务的执行时间,如批处理作业。

测量后台任务的执行时间

LongTaskTimer longTaskTimer = registry.more().longTaskTimer("backgroundTask.timer");
LongTaskTimer.Sample sample = longTaskTimer.start();
// 执行后台任务
executeBackgroundTask();
sample.stop();

// 获取统计结果
long activeTasks = longTaskTimer.activeTasks(); // 活动任务数量
double totalTime = longTaskTimer.totalTime(); // 总执行时间

FunctionCounter(函数计数器)

  • 介绍:FunctionCounter用于计算带有自定义函数的计数器,根据函数的返回值进行计数。
  • 适用场景:适用于度量满足特定条件的事件数量。

计数处理成功的请求次数

FunctionCounter counter = FunctionCounter
    						.builder("success.counter", this, obj -> obj.getSuccessCount())
                            .register(registry);

// 获取统计结果
double count = counter.count(); // 成功请求次数

FunctionTimer(函数计时器)

  • 介绍:FunctionTimer用于测量带有自定义函数的代码块的执行时间,并提供统计信息,如执行次数和平均执行时间。
  • 适用场景:适用于度量带有自定义函数的代码块的执行时间。

测量自定义函数处理请求的执行时间示例

FunctionTimer timer = FunctionTimer
    					.builder("request.timer", this, obj -> obj.getRequestExecutionTime())
                        .register(registry);

// 获取统计结果
long count = timer.count(); // 执行次数
double mean = timer.mean(TimeUnit.MILLISECONDS); // 平均执行时间

TimeGauge(时间测量仪)

  • 介绍:TimeGauge用于测量一个可变值,并将其解释为时间单位,例如测量任务的执行时间。
  • 适用场景:适用于度量一个可变值,并将其解释为时间单位的场景。

测量请求的处理时间示例

TimeGauge timeGauge = TimeGauge
    					.builder("request.timeGauge", () -> getRequestProcessingTime(), TimeUnit.MILLISECONDS)
                        .register(registry);

// 获取统计结果
double value = timeGauge.value(); // 当前值

总结

下面是关于每个组件的使用场景和建议:

组件 使用场景 建议
Timer 测量方法执行时间、请求响应时间等 用于度量需要精确测量执行时间的代码块,可以获取平均执行时间和执行时间分布等统计信息。
Counter 计数事件发生次数、错误发生次数等,用于累积值只能增加不能减少 用于度量事件发生的次数,提供累计计数的统计信息。
Gauge 测量实时状态和指标,如内存使用量、队列长度等 用于度量可变值的当前状态,适用于实时监测和跟踪指标。
DistributionSummary 测量一组值的分布情况,如请求响应时间、响应大小等 用于度量一组值的分布情况,可以获取最大值、最小值、平均值和百分位数等统计信息。
LongTaskTimer 测量长时间运行的任务的执行时间,如批处理作业等 用于度量长时间运行的任务的执行时间,可以获取活动任务数量和总执行时间等统计信息。
FunctionCounter 计数满足特定条件的事件数量 用于度量满足特定条件的事件数量,根据自定义函数的返回值进行计数。
FunctionTimer 测量带有自定义函数的代码块的执行时间 用于度量带有自定义函数的代码块的执行时间,可以获取执行次数和平均执行时间等统计信息。
TimeGauge 测量一个可变值并将其解释为时间单位,如任务的执行时间等 用于度量一个可变值,并将其解释为时间单位,适用于测量执行时间等场景。

这些使用场景和建议将帮助您选择适合您需求的Micrometer组件,并根据具体情况设置和使用各个组件以获取所需的统计信息。

代码实现

配置类

@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication
public class WebsocketMetricsAutoConfiguration {
    
    

    @Bean
    public WebsocketMetrics websocketMetrics(MeterRegistry meterRegistry) {
    
    
        return new WebsocketMetrics(meterRegistry);
    }

    /**
     * 示例:用于将指标数据记录到日志中。
     */
    @Bean
    public MeterRegistry meterRegistry() {
    
    
        return new LoggingMeterRegistry(new LoggingRegistryConfig() {
    
    
            @Override
            public String get(String key) {
    
    
                return null;
            }

            @Override
            public Duration step() {
    
    
                return Duration.ofSeconds(60);
            }
        }, Clock.SYSTEM);
    }

}

指标类

public class WebsocketMetrics {
    
    
    private final MeterRegistry meterRegistry;
    private final Timer lifeTimeTimer;
    private final Counter websocketOpenCounter;
    private final Counter transportErrorCounter;

    public WebsocketMetrics(MeterRegistry meterRegistry) {
    
    
        this.meterRegistry = meterRegistry;
        // 初始化一些 tag 值是死的埋点
        lifeTimeTimer = Timer.builder("connection.lifetime").tag("tag1", "value1").tag("tag2", "value1").register(meterRegistry);
        websocketOpenCounter = Counter.builder("connection.open.count").tag("tag1", "value1").tag("tag2", "value1").register(meterRegistry);
        transportErrorCounter = Counter.builder("connection.transport.error.count").tag("tag1", "value1").register(meterRegistry);
    }

    public void websocketOpenCounter() {
    
    
        websocketOpenCounter.increment(1);
    }

    public void websocketGauge(Map webSocketSessionMap) {
    
    
        Gauge.builder("connection.current.gauge", () -> webSocketSessionMap.size()).register(meterRegistry);
    }

    /**
     * tag 值是活的可以这样埋点
     */

    public void webSocketCloseConnectionCounter(long count, int closeCode) {
    
    
        Counter webSocketCloseConnectionCounter = Counter.builder("connection.close.count")
                .tag("code", closeCode + "")
                .register(meterRegistry);
        webSocketCloseConnectionCounter.increment(count);
    }


    public void messageProcessTimer(long nanoDuration, boolean success, String messageType) {
    
    
        Timer wsOpenFailureTimer = Timer.builder(MESSAGE_PROCESS_TIMER)
                .tag("result", success + "")
                .tag("action", messageType)
                .register(meterRegistry);
        wsOpenFailureTimer.record(nanoDuration, TimeUnit.NANOSECONDS);
    }
}

指标触发

@Service
@Slf4j
public class LakerChatHandler extends AbstractWebSocketHandler {
    
    
    private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
    WebsocketMetrics websocketMetrics;

    public LakerChatHandler(WebsocketMetrics websocketMetrics) {
    
    
        this.websocketMetrics = websocketMetrics;
        websocketMetrics.websocketGauge(webSocketSessionMap);
    }

    /**
     * 成功连接时
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    
    
        webSocketSessionMap.put(session.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
        websocketMetrics.websocketOpenCounter();
    }

    /**
     * 处理textmessage
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    
    
        String exception = "";
        long start = System.nanoTime();
        try {
    
    
			// 业务处理
        } catch (Exception e) {
    
    
            exception = e.getClass().getSimpleName();
            throw e;
        } finally {
    
    
            websocketMetrics.websocketProcessTimer(System.nanoTime() - start, exception);
        }
    }


    /**
     * 连接关闭时
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    
    
        websocketMetrics.webSocketCloseConnectionCounter(1, status.getCode());
        WebSocketSession concurrentSession = webSocketSessionMap.remove(session.getId());
        if (concurrentSession.isOpen()) {
    
    
            try {
    
    
                concurrentSession.close();
            } catch (Exception ex) {
    
    
                // ignore
            }
        }
    }

结果日志

测试日志如下

connection.close.count{code=1006} throughput=0.05/s
connection.open.count{tag1=value1,tag2=value1} throughput=0.05/s
connection.current.gauge{} value=2
connection.message.process{exception=,tag1=value1} throughput=0.033333/s mean=3.25064095s max=6.4944218s
http.server.requests{exception=None,method=GET,outcome=CLIENT_ERROR,status=404,uri=/**} throughput=0.016667/s mean=0.009702s max=0.009702s
http.server.requests{exception=None,method=GET,outcome=SUCCESS,status=200,uri=/captcha} throughput=0.016667/s mean=0.4792633s max=0.4792633s
http.server.requests{exception=None,method=GET,outcome=INFORMATIONAL,status=101,uri=/websocket/**} throughput=0.033333/s mean=0.01526635s max=4.35s

猜你喜欢

转载自blog.csdn.net/abu935009066/article/details/131454820