开启消息轨迹
broker端
traceTopicEnable
属性设置为true,默认值为false。设置为true,broker启动的时候会初始化存储轨迹数据的默认topic:RMQ_SYS_TRACE_TOPIC
;
traceOn
属性设置为true,默认值也是true。该属性如果设置false,客户端不会发送轨迹数据到broker端
producer
构造producer对象的时候,设置enableMsgTrace=true,customizedTraceTopic可以为空,使用默认的topic,其它重载接口类似
/**
* Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
}
consumer
构造consumer对象的时候,设置enableMsgTrace=true,customizedTraceTopic可以为空,使用默认的topic,其它重载接口类似
/**
* Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
消息轨迹数据的存储介质
消息轨迹数据还是存储在RocketMQ的broker,每条轨迹数据就像普通消息一样,发送到指定的topic上。
原始消息的ID和KEYS会做为轨迹消息的KEYS,这样可以用来检索指定消息的轨迹数据。
不使用外部存储介质的一个好处是避免依赖第三方组件。
Producer如何采集轨迹数据
在初始化producer的时候注册一个SendMessageHook,在消息发送前、后采集消息发送的上下文信息,并在消息发送完成后异步投递轨迹数据到broker
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
// 发送前采集的轨迹数据如下
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean);
// 发送前采集部分数据到上下文
}
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}
TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
// traceBean里保存了发送前采集的相关信息
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
// 发送耗时,traceBeans实际只会有一条数据
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
// 计算存储时间方式:就是认为总耗时的一半,所以这不一个准确值
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
// 准备异步发送轨迹数据,并不是立即发送
localDispatcher.append(tuxeContext);
}
Consumer如何采集轨迹数据
consusmer和producer类似,注册一个ConsumeMessageHook,但与producer最大的区别是producer是消息发送完成后发条该条消息的轨迹数据,但是consumer是在消费前采集部分数据发送一次,消费后再采集部分数据发送一次,消费是共2条轨迹数据。如果消费失败进行重试,每重试一次自己就要再记录两条
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);//
traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (traceOn != null && traceOn.equals("false")) {
// If trace switch is false ,skip it
continue;
}
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);//消费前发送一次
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
localDispatcher.append(subAfterContext);//消费后发送一次
}
如何发送轨迹数据
客户端在发送或消费消息时,将轨迹消息放入一个阻塞队列便结束了,会有一个异步线程从这个队列里取出轨迹消息封装为一个发送任务提交到线程池,然后发送到broker。
- 存放待处理轨迹消息的队列默认大小1024,如果满了,当前轨迹消息打个日志就丢弃了
- 有一个异步线程不断轮询从存入轨迹消息的队列取出数据,每次最多100条(或者等待5ms还不够100),封装为一个发送请求任务,提交到发送轨迹消息的线程池
- 发送任务将这批次消息按topic分类,每个topic一批消息,按批次处理发送给轨迹topic,把原始消息的消息ID和消息keys做为这个轨迹消息的keys,这批原始消息的元数据(多条的话,每条消息的元数据最终是拼接到一条的,每条消息的元数据尾部有个字段分隔符,查询的时候可以用来拆分)作为消息体
如何查询轨迹数据
因为消息轨迹数据是发到指定的轨迹topic上,原始消息的ID和消息KEYS作为轨迹消息的KEYS,所以可以用目标消息的消息ID作为轨迹消息的key从轨迹topic上查出来相关消息,对查出来的消息体解析,如果解析出来的消息体数据的消息ID字段与目标消息ID匹配,就是我们想要的消息轨迹数据。一般来说,正常情况下发送和消费应该能查出来是3条,一条发送轨迹,两条消费轨迹(消费前和消费后)。