eureka 源码分析三--高效多线程并发实现

1. eureka server端实例存储结构

ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
其中key = appName,value = ConcurrentHashMap

//存储覆盖状态
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
        .newBuilder().initialCapacity(500)
        .expireAfterAccess(1, TimeUnit.HOURS)
        .<String, InstanceStatus>build().asMap();

1.1 eureka server status

状态 描述
starting 实例初始化状态,此状态主要给实例预留初始化时间
down 当健康检查失败时,实例的状态转变到down
up 正常服务状态
out_of_service 不参与接收服务 。但是服务正常
unknown 未知状态

1.2 与server status有关的方法

方法 描述 调用场景
register 注册实例 1.实例启动的时候开始注册;2.心跳404时开始注册实例
cancel 取消server注册的某一实例 客户端shutdown实例后,调用cancel
renew 续租请求 客户端定时心跳时调用该方法。deleteStatusOvrride方法调用后会触发renew续租失败,触发重新注册实例。
statusUpdate 状态更新方法 主要调用的时机时让此service下线,不在接收请求。或者让下线的服务重新上线。
deleteStatusOverride 移除实例的覆盖状态 实例的覆盖状态移除后,覆盖状态将变成unknown

2.实例注册源码分析

/**
    * Registers the information about the {@link InstanceInfo} and replicates
    * this information to all peer eureka nodes. If this is replication event
    * from other replica nodes then it is not replicated.
    *
    *  注册实例信息。并且同步这些信息到所有的对等节点。
    *
    *  如果这个同步事件来自其他同步节点。那么它将不进行二次同步
    * @param info
    *            the {@link InstanceInfo} to be registered and replicated.
    * @param isReplication
    *            true if this is a replication event from other replica nodes,
    *            false otherwise.
    */
   @Override
   public void register(final InstanceInfo info, final boolean isReplication) {
       int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
       if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
           leaseDuration = info.getLeaseInfo().getDurationInSecs();
       }
       super.register(info, leaseDuration, isReplication);

       /**
        * 同步到其他节点
        */
       replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
   }

super.register()实现

/**
    * Registers a new instance with a given duration.
    * 注册一个新的实例
    * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
    */
   public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
       try {
           read.lock();
           /**获取appName 下的map*/
           Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
           REGISTER.increment(isReplication);
           if (gMap == null) {
               final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
               //如果值不存在,则添加。否则返回旧值
               gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
               if (gMap == null) {
                   gMap = gNewMap;
               }
           }

           /**
            * 获取续租实例
            */
           Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
           // Retain the last dirty timestamp without overwriting it, if there is already a lease
           if (existingLease != null && (existingLease.getHolder() != null)) {
               /**server端注册的实例的上次更新时间*/
               Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();

               /**此次注册实例的上次更新时间*/
               Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
               logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

               // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
               // InstanceInfo instead of the server local copy.
               /**
                * 如果server端的实例更新时间 > 此次注册的实例更新时间
                * 说明 server端的实例是最新的
                * 此次注册的实例有可能是由于网络延迟后面到的。
                */
               if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                   logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                           " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                   logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                   registrant = existingLease.getHolder();
               }
           } else {
               // The lease does not exist and hence it is a new registration
               /**续租实例不存在,说明此次注册的实例是最新的*/
               synchronized (lock) {

                   /**
                    * server期望每分钟续租的实例个数
                    *
                    * 每30s续租一次:那么一分钟; this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    */
                   if (this.expectedNumberOfRenewsPerMin > 0) {
                       // Since the client wants to cancel it, reduce the threshold
                       // (1
                       // for 30 seconds, 2 for a minute)
                       this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;

                       /**每分钟续租的最小阀值*/
                       this.numberOfRenewsPerMinThreshold =
                               (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                   }
               }
               logger.debug("No previous lease information found; it is new registration");
           }

           /**
            * 构造一个新的租赁实例
            */
           Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
           if (existingLease != null) {
               //设置服务的开始时间
               lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
           }
           gMap.put(registrant.getId(), lease);
           synchronized (recentRegisteredQueue) {
               recentRegisteredQueue.add(new Pair<Long, String>(
                       System.currentTimeMillis(),
                       registrant.getAppName() + "(" + registrant.getId() + ")"));
           }
           // This is where the initial state transfer of overridden status happens
           //instanceinfo 的OverriddenStatus的状态发生转移发生在这点
           //判断实例的overriddenStatus
           //枚举的equals等于 ==
           if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
               logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                               + "overrides", registrant.getOverriddenStatus(), registrant.getId());

               /**
                * 如果注册的实例状态 不是unknown。并且overriddenInstanceStatusMap 不存在
                */
               if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                   logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                   overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
               }
           }

           /**
            * 由于跟statusUpdate和deleteStautsOverride存在竞争关系
            * 。因此实例的状态有可能会发生改变。eureka解决的办法是使用已知的规则:
            * 1. 如果实例的状态是down或者starting。这直接使用实例的状态,因为此状态不可能由别的渠道被改变。
            * 2.如果实例的状态是up后者Out_of_service。则使用overrideStatus状态,因为statusUpdate状态会在up和out_of_server状态之间做切换。
            * 
            */
           InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
           if (overriddenStatusFromMap != null) {
               logger.info("Storing overridden status {} from map", overriddenStatusFromMap);

               //设置注册实例的覆盖状态来自于Map
               registrant.setOverriddenStatus(overriddenStatusFromMap);
           }

           // Set the status based on the overridden status rules

           /**
            * 设置实例状态依据 覆盖状态规则
            */
           InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);

           //设置当前实例的状态。不去更新状态改变时间
           registrant.setStatusWithoutDirty(overriddenInstanceStatus);

           // If the lease is registered with UP status, set lease service up timestamp

           /**
            * 如果实例被注册与UP状态。设置实例的状态到up
            */
           if (InstanceStatus.UP.equals(registrant.getStatus())) {
               lease.serviceUp();
           }
           registrant.setActionType(ActionType.ADDED);
           recentlyChangedQueue.add(new RecentlyChangedItem(lease));

           /**设置实例最后一次更新时间戳*/
           registrant.setLastUpdatedTimestamp();
           /**缓存失效*/
           invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
           logger.info("Registered instance {}/{} with status {} (replication={})",
                   registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
       } finally {
           read.unlock();
       }
   }

猜你喜欢

转载自blog.csdn.net/ai_xiangjuan/article/details/80303877