分布式系统内部RetryCache机制

前言


在分布式系统的运行过程中,出现网络不稳定(例如网络超时)导致的client请求回复超时是时有发生的事。在这种低概率发生的情况下时,client端其实是无法感知它的请求是不是真正的被处理了,它只能是基于坏的情况(即请求没被server处理的情况),然后执行重试操作。问题就出现在这里,对于某些非幂的操作而言,操作重试是会返回不同的结果的。这个时候,其实server端不应该执行client端发起的第二次请求的,假设server已经成功处理了client的第一次请求。本文我们就来聊聊针对非幂等操作处理的RetryCache,通过RetryCache我们可以避免重复请求被处理。

非幂等操作重复处理的问题


这里重新回过头来说说非幂等操作被重复处理可能会导致的问题。

简单概括下来,会有以下几点潜在问题:

1)Application端因为接收到server返回的异常结果信息而导致application失败。因为非幂等类型重复请求第二次被server端时,可能会导致返回错误结果。比如重复执行创建文件请求时,第二次系统就会返回FileAlreadyExistException之类的错误。

2)Server端metadata信息的破坏。假设我们执行完create文件操作后,然后这个文件被相关任务read完随后被正常清理了,但是client的重试导致这个文件又被创建了一次,这就有可能引发metadata信息的损坏了。

3)Server HA failover切换时的metadata一致性问题。当服务在做HA failover切换和时候,服务主备切换是一个比较重的操作,failover切换期间是会出现client端请求无响应超时的情况的。这个时候可能部分请求被处理,部分实质上没有被处理,在服务主备切换后,为了保证server状态的完全一致性,我们需要利用RetryCache,来帮助server做重复的请求处理。当然,这里面需要新的active server做内部RetryCache的重建操作。

鉴于上述的问题,因此我们需要引入一个内部Cache来存放已执行过的请求调用结果,来防止非幂等操作的重复处理。在这里,我们称之为上述Cache为RetryCache。

RetryCache的实现细节


如果我们想实现一套完整的RetryCache,有哪些点需要重点去考虑的呢?

这里主要列出了以下几点:

  • Client请求call的独立标识区分。目前RPC server内部一般会带有类似callId的概念来区分请求,但是单一callId还是无法区分出请求是否来自于同一个机器的client还是多个机器的client。在这里我们需要额外引入clientId的字段,以此组成<callId+clientId>的联合Id方式。
  • 标记区分操作方法是否是幂等类型的还是非幂等类型的,我们只对后者类型请求结果进行RetryCache的存放。
  • RetryCache内部每个Cache Entry不可能保证永久的存放,需要有过期时间的限制。
  • RetryCache的信息持久化和重建过程的考虑,这个主要发生在HA服务做主从切换的时候。

RetryCache的实现样例


针对上面的实现细节,我们通过一个具体样例来进行更具化地了解,此样例取自于Hadoop 所使用的RetryCache类。

首先是Cache Entry的定义:

  /**
   * CacheEntry is tracked using unique client ID and callId of the RPC request
   */
  public static class CacheEntry implements LightWeightCache.Entry {
    
    
    /**
     * Processing state of the requests
     */
    private static byte INPROGRESS = 0;
    private static byte SUCCESS = 1;
    private static byte FAILED = 2;

    /** 此entry代表的请求目前的状态,正在被处理,或者已经处理成功或失败*/
    private byte state = INPROGRESS;
    
    ...
    
    private final int callId;
    private final long expirationTime;
    private LightWeightGSet.LinkedElement next;

    /**
     * 一个全新的cache entry,它需要有clientId,callId以及过期时间.
     */
    CacheEntry(byte[] clientId, int callId, long expirationTime) {
    
    
      // ClientId must be a UUID - that is 16 octets.
      Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
          "Invalid clientId - length is " + clientId.length
              + " expected length " + ClientId.BYTE_LENGTH);
      // Convert UUID bytes to two longs
      clientIdMsb = ClientId.getMsb(clientId);
      clientIdLsb = ClientId.getLsb(clientId);
      this.callId = callId;
      this.expirationTime = expirationTime;
    }
	...

    @Override
    public boolean equals(Object obj) {
    
    
      if (this == obj) {
    
    
        return true;
      }
      if (!(obj instanceof CacheEntry)) {
    
    
        return false;
      }
      CacheEntry other = (CacheEntry) obj;
      // cache entry的equal通过callId和clientId联合比较,确保请求是来自重试操作的client
      return callId == other.callId && clientIdMsb == other.clientIdMsb
          && clientIdLsb == other.clientIdLsb;
    }

}
  /**
   * CacheEntry with payload that tracks the previous response or parts of
   * previous response to be used for generating response for retried requests.
   */
  public static class CacheEntryWithPayload extends CacheEntry {
    
    
    // palyload简单理解为带了返回结果对象实例的RPC call
    private Object payload;

    CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
        long expirationTime) {
    
    
      super(clientId, callId, expirationTime);
      this.payload = payload;
    }

下面是核心的RetryCache结果获取的方法调用:

   */
  private CacheEntry waitForCompletion(CacheEntry newEntry) {
    
    
    CacheEntry mapEntry = null;
    lock.lock();
    try {
    
    
      // 1)从Cache中获取是否有对应Cache Entry
      mapEntry = set.get(newEntry);
      // 如果没有,则加入此entry到Cache中
      if (mapEntry == null) {
    
    
        if (LOG.isTraceEnabled()) {
    
    
          LOG.trace("Adding Rpc request clientId "
              + newEntry.clientIdMsb + newEntry.clientIdLsb + " callId "
              + newEntry.callId + " to retryCache");
        }
        set.put(newEntry);
        retryCacheMetrics.incrCacheUpdated();
        return newEntry;
      } else {
    
    
        retryCacheMetrics.incrCacheHit();
      }
    } finally {
    
    
      lock.unlock();
    }
    // Entry already exists in cache. Wait for completion and return its state
    Preconditions.checkNotNull(mapEntry,
        "Entry from the cache should not be null");
    // Wait for in progress request to complete
    // 3)如果获取到了Cache Entry,如果状态是正在执行中的,则等待其结束
    synchronized (mapEntry) {
    
    
      while (mapEntry.state == CacheEntry.INPROGRESS) {
    
    
        try {
    
    
          mapEntry.wait();
        } catch (InterruptedException ie) {
    
    
          // Restore the interrupted status
          Thread.currentThread().interrupt();
        }
      }
      // Previous request has failed, the expectation is that it will be
      // retried again.
      if (mapEntry.state != CacheEntry.SUCCESS) {
    
    
        mapEntry.state = CacheEntry.INPROGRESS;
      }
    }
    // 4)Cache Entry对应的call已经结束,则返回之前cache的结果
    return mapEntry;
  }

我们接着来看实际RetryCache的调用场景:

  public long addCacheDirective(
      CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
    
    
    checkNNStartup();
    namesystem.checkOperation(OperationCategory.WRITE);
    // 1)从RetryCache中查询是否已经是执行过的RPC call调用
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
      (retryCache, null);
    // 2)如果有同一调用,并且是成功状态的,则返回上次payload的结果
    // 否则进行后续处理操作的调用
    if (cacheEntry != null && cacheEntry.isSuccess()) {
    
    
      return (Long) cacheEntry.getPayload();
    }

    boolean success = false;
    long ret = 0;
    try {
    
    
      ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
      success = true;
    } finally {
    
    
      // 3)操作完毕后,在RetryCache内部更新Entry的状态结果,
      // 并设置payload对象(返回结果对象)
      RetryCache.setState(cacheEntry, success, ret);
    }
    return ret;
  }

以上更多的实现细节可参照下文引用链接代码。

引用


[1].https://issues.apache.org/jira/browse/HDFS-4979
[2].https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java

猜你喜欢

转载自blog.csdn.net/Androidlushangderen/article/details/106169970