Hystrix的Command初始化

版权声明: https://blog.csdn.net/ph3636/article/details/82258952

1. 在Hystrix中,Command就相当于接口服务,每个Command都可以设置自己的数据配置,继承HystrixCommand实现他的构造函数,最少需要设置HystrixCommandGroupKey,因为后续要根据这个来区别每个Command,服务总该有个执行方法也就是实现父类的run(),当满足熔断要求的时候需要实现getFallback(),当需要程序缓存的时候需要实现getCacheKey()。

2. HystrixCommandGroupKey.Factory.asKey("User"),创建服务独有的key并且保存到HystrixCommandGroupKey的InternMap中,其他类似的还有HystrixCommandKey和HystrixThreadPoolKey生成。

3. 初始化AbstractCommand构造函数

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

        this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }

初始化HystrixCommandKey,默认是类的名字为对应的key,即class.getSimpleName()

4. 初始化通用的配置HystrixCommandProperties,有一套默认的配置,每个服务组都可以有自己独特的配置,HystrixPropertiesFactory.getCommandProperties(commandKey, commandPropertiesDefaults);

HystrixPropertiesStrategy hystrixPropertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();

先初始化HystrixPlugins,类加载器是加载当前类的加载器。通过回调把日志属性传递到方法中。

private HystrixPlugins(ClassLoader classLoader, LoggerSupplier logSupplier) {
        //This will load Archaius if its in the classpath.
        this.classLoader = classLoader;
        //N.B. Do not use a logger before this is loaded as it will most likely load the configuration system.
        //The configuration system may need to do something prior to loading logging. @agentgt
        dynamicProperties = resolveDynamicProperties(classLoader, logSupplier);
    }

先从系统配置中查找对应的配置类System.getProperty(name, fallback)

HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class, 
        HystrixDynamicPropertiesSystemProperties.getInstance());

没有的话从类加载器加载的类中查找HystrixDynamicProperties类型的类,

没有的话从Archaius中加载对应的类

最后没有的话就会加载默认的配置类HystrixDynamicPropertiesSystemProperties,设置动态配置类HystrixDynamicProperties

 private static HystrixDynamicProperties resolveDynamicProperties(ClassLoader classLoader, LoggerSupplier logSupplier) {
        HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class, 
                HystrixDynamicPropertiesSystemProperties.getInstance());
        if (hp != null) {
            logSupplier.getLogger().debug(
                    "Created HystrixDynamicProperties instance from System property named "
                    + "\"hystrix.plugin.HystrixDynamicProperties.implementation\". Using class: {}", 
                    hp.getClass().getCanonicalName());
            return hp;
        }
        hp = findService(HystrixDynamicProperties.class, classLoader);
        if (hp != null) {
            logSupplier.getLogger()
                    .debug("Created HystrixDynamicProperties instance by loading from ServiceLoader. Using class: {}", 
                            hp.getClass().getCanonicalName());
            return hp;
        }
        hp = HystrixArchaiusHelper.createArchaiusDynamicProperties();
        if (hp != null) {
            logSupplier.getLogger().debug("Created HystrixDynamicProperties. Using class : {}", 
                    hp.getClass().getCanonicalName());
            return hp;
        }
        hp = HystrixDynamicPropertiesSystemProperties.getInstance();
        logSupplier.getLogger().info("Using System Properties for HystrixDynamicProperties! Using class: {}", 
                hp.getClass().getCanonicalName());
        return hp;
    }
    
    private static <T> T findService(
            Class<T> spi, 
            ClassLoader classLoader) throws ServiceConfigurationError {
        
        ServiceLoader<T> sl = ServiceLoader.load(spi,
                classLoader);
        for (T s : sl) {
            if (s != null)
                return s;
        }
        return null;
    }

加载属性策略类,依次从上面的动态配置中获取,从类加载器的类中获取,没有则设置默认HystrixPropertiesStrategyDefault

从属性策略中获取通用配置类,先取缓存key,这里的key就是我们最初设置的服务组名,没有则生成自己特有的通用配置。

 // create new instance
                properties = hystrixPropertiesStrategy.getCommandProperties(key, builder);
                // cache and return
                HystrixCommandProperties existing = commandProperties.putIfAbsent(cacheKey, properties);

通用配置默认为HystrixPropertiesCommandDefault,HystrixCommandProperties.Setter可以配置自己需要独特的属性,所有的通用配置如下,基本上都会有全局默认值。配置key前缀为"hystrix"

 protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) {
        this.key = key;
        this.circuitBreakerEnabled = getProperty(propertyPrefix, key, "circuitBreaker.enabled", builder.getCircuitBreakerEnabled(), default_circuitBreakerEnabled);
        this.circuitBreakerRequestVolumeThreshold = getProperty(propertyPrefix, key, "circuitBreaker.requestVolumeThreshold", builder.getCircuitBreakerRequestVolumeThreshold(), default_circuitBreakerRequestVolumeThreshold);
        this.circuitBreakerSleepWindowInMilliseconds = getProperty(propertyPrefix, key, "circuitBreaker.sleepWindowInMilliseconds", builder.getCircuitBreakerSleepWindowInMilliseconds(), default_circuitBreakerSleepWindowInMilliseconds);
        this.circuitBreakerErrorThresholdPercentage = getProperty(propertyPrefix, key, "circuitBreaker.errorThresholdPercentage", builder.getCircuitBreakerErrorThresholdPercentage(), default_circuitBreakerErrorThresholdPercentage);
        this.circuitBreakerForceOpen = getProperty(propertyPrefix, key, "circuitBreaker.forceOpen", builder.getCircuitBreakerForceOpen(), default_circuitBreakerForceOpen);
        this.circuitBreakerForceClosed = getProperty(propertyPrefix, key, "circuitBreaker.forceClosed", builder.getCircuitBreakerForceClosed(), default_circuitBreakerForceClosed);
        this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);
        //this property name is now misleading.  //TODO figure out a good way to deprecate this property name
        this.executionTimeoutInMilliseconds = getProperty(propertyPrefix, key, "execution.isolation.thread.timeoutInMilliseconds", builder.getExecutionIsolationThreadTimeoutInMilliseconds(), default_executionTimeoutInMilliseconds);
        this.executionTimeoutEnabled = getProperty(propertyPrefix, key, "execution.timeout.enabled", builder.getExecutionTimeoutEnabled(), default_executionTimeoutEnabled);
        this.executionIsolationThreadInterruptOnTimeout = getProperty(propertyPrefix, key, "execution.isolation.thread.interruptOnTimeout", builder.getExecutionIsolationThreadInterruptOnTimeout(), default_executionIsolationThreadInterruptOnTimeout);
        this.executionIsolationThreadInterruptOnFutureCancel = getProperty(propertyPrefix, key, "execution.isolation.thread.interruptOnFutureCancel", builder.getExecutionIsolationThreadInterruptOnFutureCancel(), default_executionIsolationThreadInterruptOnFutureCancel);
        this.executionIsolationSemaphoreMaxConcurrentRequests = getProperty(propertyPrefix, key, "execution.isolation.semaphore.maxConcurrentRequests", builder.getExecutionIsolationSemaphoreMaxConcurrentRequests(), default_executionIsolationSemaphoreMaxConcurrentRequests);
        this.fallbackIsolationSemaphoreMaxConcurrentRequests = getProperty(propertyPrefix, key, "fallback.isolation.semaphore.maxConcurrentRequests", builder.getFallbackIsolationSemaphoreMaxConcurrentRequests(), default_fallbackIsolationSemaphoreMaxConcurrentRequests);
        this.fallbackEnabled = getProperty(propertyPrefix, key, "fallback.enabled", builder.getFallbackEnabled(), default_fallbackEnabled);
        this.metricsRollingStatisticalWindowInMilliseconds = getProperty(propertyPrefix, key, "metrics.rollingStats.timeInMilliseconds", builder.getMetricsRollingStatisticalWindowInMilliseconds(), default_metricsRollingStatisticalWindow);
        this.metricsRollingStatisticalWindowBuckets = getProperty(propertyPrefix, key, "metrics.rollingStats.numBuckets", builder.getMetricsRollingStatisticalWindowBuckets(), default_metricsRollingStatisticalWindowBuckets);
        this.metricsRollingPercentileEnabled = getProperty(propertyPrefix, key, "metrics.rollingPercentile.enabled", builder.getMetricsRollingPercentileEnabled(), default_metricsRollingPercentileEnabled);
        this.metricsRollingPercentileWindowInMilliseconds = getProperty(propertyPrefix, key, "metrics.rollingPercentile.timeInMilliseconds", builder.getMetricsRollingPercentileWindowInMilliseconds(), default_metricsRollingPercentileWindow);
        this.metricsRollingPercentileWindowBuckets = getProperty(propertyPrefix, key, "metrics.rollingPercentile.numBuckets", builder.getMetricsRollingPercentileWindowBuckets(), default_metricsRollingPercentileWindowBuckets);
        this.metricsRollingPercentileBucketSize = getProperty(propertyPrefix, key, "metrics.rollingPercentile.bucketSize", builder.getMetricsRollingPercentileBucketSize(), default_metricsRollingPercentileBucketSize);
        this.metricsHealthSnapshotIntervalInMilliseconds = getProperty(propertyPrefix, key, "metrics.healthSnapshot.intervalInMilliseconds", builder.getMetricsHealthSnapshotIntervalInMilliseconds(), default_metricsHealthSnapshotIntervalInMilliseconds);
        this.requestCacheEnabled = getProperty(propertyPrefix, key, "requestCache.enabled", builder.getRequestCacheEnabled(), default_requestCacheEnabled);
        this.requestLogEnabled = getProperty(propertyPrefix, key, "requestLog.enabled", builder.getRequestLogEnabled(), default_requestLogEnabled);

        // threadpool doesn't have a global override, only instance level makes sense
        this.executionIsolationThreadPoolKeyOverride = forString().add(propertyPrefix + ".command." + key.name() + ".threadPoolKeyOverride", null).build();
    }

初始化HystrixThreadPoolKey,key值可以设置command命令通用的或者是服务组名。HystrixThreadPoolKey.Factory.asKey(groupKey.name())

5. 初始化HystrixCommandMetrics,初始化一些统计流类,限流等相关的类,

HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
        super(null);
        this.key = key;
        this.group = commandGroup;
        this.threadPoolKey = threadPoolKey;
        this.properties = properties;

        healthCountsStream = HealthCountsStream.getInstance(key, properties);
        rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
        cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);

        rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
        rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
        rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
    }

初始化熔断器HystrixCircuitBreakerImpl,设置对应的规则,订阅结果处理。比如设置熔断开关等等

//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();

6. 初始化线程池,获取默认配置策略HystrixPropertiesStrategy hystrixPropertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();获取线程池配置HystrixPropertiesThreadPoolDefault,获取并发策略HystrixConcurrencyStrategyDefault,生成对应的线程池ThreadPoolExecutor。

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            /* strategy: HystrixMetricsPublisherThreadPool */
            HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
        }

初始化HystrixThreadPoolMetrics,初始化关于线程的一些统计流和限流数据,

private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
        super(null);
        this.threadPoolKey = threadPoolKey;
        this.threadPool = threadPool;
        this.properties = properties;

        rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
    }

6. 初始化HystrixEventNotifier时间通知,HystrixConcurrencyStrategy并发策略,HystrixMetricsPublisherCommand发布命令,HystrixCommandExecutionHook命令执行钩子,HystrixRequestCache请求缓存类,初始化HystrixRequestLog时采用了懒初始化,返回return new HystrixLifecycleForwardingRequestVariable<T>(rv);

public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
        /*
         * 1) Fetch RequestVariable implementation from cache.
         * 2) If no implementation is found in cache then construct from factory.
         * 3) Cache implementation from factory as each object instance needs to be statically cached to be relevant across threads.
         */
        RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
        HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
        if (rvInstance == null) {
            requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
            /*
             * A safety check to help debug problems if someone starts injecting dynamically created HystrixConcurrencyStrategy instances - which should not be done and has no good reason to be done.
             * 
             * The 100 value is arbitrary ... just a number far higher than we should see.
             */
            if (requestVariableInstance.size() > 100) {
                logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
            }
        }

        return (T) requestVariableInstance.get(key).get();
    }

当get()时,先判断请求上下文是否已经初始化,我们在最开始使用时已经调用了HystrixRequestContext context = HystrixRequestContext.initializeContext();获取上下文HystrixRequestContext.getContextForCurrentThread().state对应的LazyInitializer,然后执行HystrixRequestVariableDefault#initialValue()返回return new HystrixRequestLog();最后初始化信号量重写机制以控制请求。

猜你喜欢

转载自blog.csdn.net/ph3636/article/details/82258952