skywalking trace跟踪原理

skywalking trace跟踪原理:

1.先理解trace的几个概念,EntrySpan EntrySpan ExitSpan

https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.2.0/guides/Java-Plugin-Development-Guide.html

2.以百度的 brpc插件为例

ClientInterceptor

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        Request request = (Request) allArguments[0];
        InetSocketAddress remoteAddress = (InetSocketAddress) request.getChannel().remoteAddress();
        InetAddress address = remoteAddress.getAddress();
​
        final ContextCarrier contextCarrier = new ContextCarrier();
        # createExitSpan创建本地线程变量,底层是使用的 threadLocal技术
        AbstractSpan span = ContextManager.createExitSpan(generateOperationName(request), contextCarrier, address.getHostAddress() + ":" + remoteAddress.getPort());
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            if (request.getKvAttachment() == null) {
                request.setKvAttachment(new HashMap<>());
            }
            # 将rpc请求放到request的请求头中,服务端获取就会从请求头中获取
            request.getKvAttachment().put(next.getHeadKey(), next.getHeadValue());
        }
        span.setComponent(ComponentsDefine.BRPC_JAVA);
        SpanLayer.asRPCFramework(span);
    }
    private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
    #这里的get就是调用的ThreadLocal的get方法获取本线程的ThreadLcalMap对象,将来里面会放[{threadLocal1,value1},{threadLocal2,value2}],比如就可以放traceId
        AbstractTracerContext context = CONTEXT.get();
        if (context == null) {
            if (StringUtil.isEmpty(operationName)) {
                if (LOGGER.isDebugEnable()) {
                    LOGGER.debug("No operation name, ignore this trace.");
                }
                context = new IgnoredTracerContext();
            } else {
                if (EXTEND_SERVICE == null) {
                    EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
                }
                context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
​
            }
            CONTEXT.set(context);
        }
        return context;
    }

ServerInterceptor

public class ServerInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        Request request = (Request) allArguments[0];
        ContextCarrier contextCarrier = new ContextCarrier();
        CarrierItem next = contextCarrier.items();
        if (request.getKvAttachment() == null) {
            request.setKvAttachment(new HashMap<>());
        }
        while (next.hasNext()) {
            next = next.next();
            #将客户端带过来的request的getHeadKey放到自己的本地线程ThreadLocalMap中(通过createEntrySpan创建)
            next.setHeadValue((String) request.getKvAttachment().get(next.getHeadKey()));
        }
​
        AbstractSpan span = ContextManager.createEntrySpan(generateOperationName(request), contextCarrier);
        SpanLayer.asRPCFramework(span);
        span.setComponent(ComponentsDefine.BRPC_JAVA);
    }
    。。。
}

3.springmvc是如何完成跟踪的?

原理是一样的

public class InvokeInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(final EnhancedInstance objInst,
                             final Method method,
                             final Object[] allArguments,
                             final Class<?>[] argumentsTypes,
                             final MethodInterceptResult result) throws Throwable {
        ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
        ContextManager.getRuntimeContext()
                      .put(RESPONSE_KEY_IN_RUNTIME_CONTEXT, new ReactiveResponseHolder(exchange.getResponse()));
        ContextManager.getRuntimeContext()
                      .put(REQUEST_KEY_IN_RUNTIME_CONTEXT, new ReactiveRequestHolder(exchange.getRequest()));
    }

4.kafka的原理呢?

KafkaProducerInterceptor

你会发现底层还是放到head里面

public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor {
​
    public static final String OPERATE_NAME_PREFIX = "Kafka/";
    public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
​
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
​
        ContextCarrier contextCarrier = new ContextCarrier();
​
        ProducerRecord record = (ProducerRecord) allArguments[0];
        String topicName = record.topic();
        AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst
                .getSkyWalkingDynamicField());
​
        Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
        Tags.MQ_TOPIC.set(activeSpan, topicName);
        contextCarrier.extensionInjector().injectSendingTimestamp();
        SpanLayer.asMQ(activeSpan);
        activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
​
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes(StandardCharsets.UTF_8));
        }
​
        //when use lambda expression, not to generate inner class,
        //    and not to trigger kafka CallBack class define, so allArguments[1] can't to cast EnhancedInstance
        Object shouldCallbackInstance = allArguments[1];
        if (null != shouldCallbackInstance) {
            if (shouldCallbackInstance instanceof EnhancedInstance) {
                EnhancedInstance callbackInstance = (EnhancedInstance) shouldCallbackInstance;
                ContextSnapshot snapshot = ContextManager.capture();
                if (null != snapshot) {
                    CallbackCache cache = new CallbackCache();
                    cache.setSnapshot(snapshot);
                    callbackInstance.setSkyWalkingDynamicField(cache);
                }
            } else if (shouldCallbackInstance instanceof Callback) {
                Callback callback = (Callback) shouldCallbackInstance;
                ContextSnapshot snapshot = ContextManager.capture();
                if (null != snapshot) {
                    CallbackCache cache = new CallbackCache();
                    cache.setSnapshot(snapshot);
                    cache.setCallback(callback);
                    allArguments[1] = new CallbackAdapterInterceptor(cache);
                }
            }
        }
    }

KafkaConsumerInterceptor

也是从record的header中获取的,完美搞定。

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                          Object ret) throws Throwable {
    /*
     * If the intercepted method throws exception, the ret will be null
     */
    if (ret == null) {
        return ret;
    }
    Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) ret;
    //
    // The entry span will only be created when the consumer received at least one message.
    //
    if (records.size() > 0) {
        ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
        KafkaContext context = (KafkaContext) ContextManager.getRuntimeContext().get(Constants.KAFKA_FLAG);
        if (context != null) {
            ContextManager.createEntrySpan(context.getOperationName(), null);
            context.setNeedStop(true);
        }
        String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId();
        AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, null).start(requiredInfo.getStartTime());
​
        activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
        SpanLayer.asMQ(activeSpan);
        Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
        Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
​
        for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) {
            for (ConsumerRecord<?, ?> record : consumerRecords) {
                ContextCarrier contextCarrier = new ContextCarrier();
​
                CarrierItem next = contextCarrier.items();
                while (next.hasNext()) {
                    next = next.next();
                    Iterator<Header> iterator = record.headers().headers(next.getHeadKey()).iterator();
                    if (iterator.hasNext()) {
                        next.setHeadValue(new String(iterator.next().value(), StandardCharsets.UTF_8));
                    }
                }
                ContextManager.extract(contextCarrier);
            }
        }
        ContextManager.stopSpan();
    }
    return ret;
}

猜你喜欢

转载自blog.csdn.net/nmjhehe/article/details/114921653