skywalking trace跟踪原理:
1.先理解trace的几个概念,EntrySpan EntrySpan ExitSpan
2.以百度的 brpc插件为例
ClientInterceptor
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { Request request = (Request) allArguments[0]; InetSocketAddress remoteAddress = (InetSocketAddress) request.getChannel().remoteAddress(); InetAddress address = remoteAddress.getAddress(); final ContextCarrier contextCarrier = new ContextCarrier(); # createExitSpan创建本地线程变量,底层是使用的 threadLocal技术 AbstractSpan span = ContextManager.createExitSpan(generateOperationName(request), contextCarrier, address.getHostAddress() + ":" + remoteAddress.getPort()); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); if (request.getKvAttachment() == null) { request.setKvAttachment(new HashMap<>()); } # 将rpc请求放到request的请求头中,服务端获取就会从请求头中获取 request.getKvAttachment().put(next.getHeadKey(), next.getHeadValue()); } span.setComponent(ComponentsDefine.BRPC_JAVA); SpanLayer.asRPCFramework(span); }
private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) { #这里的get就是调用的ThreadLocal的get方法获取本线程的ThreadLcalMap对象,将来里面会放[{threadLocal1,value1},{threadLocal2,value2}],比如就可以放traceId AbstractTracerContext context = CONTEXT.get(); if (context == null) { if (StringUtil.isEmpty(operationName)) { if (LOGGER.isDebugEnable()) { LOGGER.debug("No operation name, ignore this trace."); } context = new IgnoredTracerContext(); } else { if (EXTEND_SERVICE == null) { EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class); } context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling); } CONTEXT.set(context); } return context; }
ServerInterceptor
public class ServerInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { Request request = (Request) allArguments[0]; ContextCarrier contextCarrier = new ContextCarrier(); CarrierItem next = contextCarrier.items(); if (request.getKvAttachment() == null) { request.setKvAttachment(new HashMap<>()); } while (next.hasNext()) { next = next.next(); #将客户端带过来的request的getHeadKey放到自己的本地线程ThreadLocalMap中(通过createEntrySpan创建) next.setHeadValue((String) request.getKvAttachment().get(next.getHeadKey())); } AbstractSpan span = ContextManager.createEntrySpan(generateOperationName(request), contextCarrier); SpanLayer.asRPCFramework(span); span.setComponent(ComponentsDefine.BRPC_JAVA); } 。。。 }
3.springmvc是如何完成跟踪的?
原理是一样的
public class InvokeInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(final EnhancedInstance objInst, final Method method, final Object[] allArguments, final Class<?>[] argumentsTypes, final MethodInterceptResult result) throws Throwable { ServerWebExchange exchange = (ServerWebExchange) allArguments[0]; ContextManager.getRuntimeContext() .put(RESPONSE_KEY_IN_RUNTIME_CONTEXT, new ReactiveResponseHolder(exchange.getResponse())); ContextManager.getRuntimeContext() .put(REQUEST_KEY_IN_RUNTIME_CONTEXT, new ReactiveRequestHolder(exchange.getRequest())); }
4.kafka的原理呢?
KafkaProducerInterceptor
你会发现底层还是放到head里面
public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor { public static final String OPERATE_NAME_PREFIX = "Kafka/"; public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer"; @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { ContextCarrier contextCarrier = new ContextCarrier(); ProducerRecord record = (ProducerRecord) allArguments[0]; String topicName = record.topic(); AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst .getSkyWalkingDynamicField()); Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField()); Tags.MQ_TOPIC.set(activeSpan, topicName); contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(activeSpan); activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes(StandardCharsets.UTF_8)); } //when use lambda expression, not to generate inner class, // and not to trigger kafka CallBack class define, so allArguments[1] can't to cast EnhancedInstance Object shouldCallbackInstance = allArguments[1]; if (null != shouldCallbackInstance) { if (shouldCallbackInstance instanceof EnhancedInstance) { EnhancedInstance callbackInstance = (EnhancedInstance) shouldCallbackInstance; ContextSnapshot snapshot = ContextManager.capture(); if (null != snapshot) { CallbackCache cache = new CallbackCache(); cache.setSnapshot(snapshot); callbackInstance.setSkyWalkingDynamicField(cache); } } else if (shouldCallbackInstance instanceof Callback) { Callback callback = (Callback) shouldCallbackInstance; ContextSnapshot snapshot = ContextManager.capture(); if (null != snapshot) { CallbackCache cache = new CallbackCache(); cache.setSnapshot(snapshot); cache.setCallback(callback); allArguments[1] = new CallbackAdapterInterceptor(cache); } } } }
KafkaConsumerInterceptor
也是从record的header中获取的,完美搞定。
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { /* * If the intercepted method throws exception, the ret will be null */ if (ret == null) { return ret; } Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) ret; // // The entry span will only be created when the consumer received at least one message. // if (records.size() > 0) { ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); KafkaContext context = (KafkaContext) ContextManager.getRuntimeContext().get(Constants.KAFKA_FLAG); if (context != null) { ContextManager.createEntrySpan(context.getOperationName(), null); context.setNeedStop(true); } String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(); AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, null).start(requiredInfo.getStartTime()); activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER); SpanLayer.asMQ(activeSpan); Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers()); Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics()); for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) { for (ConsumerRecord<?, ?> record : consumerRecords) { ContextCarrier contextCarrier = new ContextCarrier(); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); Iterator<Header> iterator = record.headers().headers(next.getHeadKey()).iterator(); if (iterator.hasNext()) { next.setHeadValue(new String(iterator.next().value(), StandardCharsets.UTF_8)); } } ContextManager.extract(contextCarrier); } } ContextManager.stopSpan(); } return ret; }