前言
在分布式系统的运行过程中,出现网络不稳定(例如网络超时)导致的client请求回复超时是时有发生的事。在这种低概率发生的情况下时,client端其实是无法感知它的请求是不是真正的被处理了,它只能是基于坏的情况(即请求没被server处理的情况),然后执行重试操作。问题就出现在这里,对于某些非幂的操作而言,操作重试是会返回不同的结果的。这个时候,其实server端不应该执行client端发起的第二次请求的,假设server已经成功处理了client的第一次请求。本文我们就来聊聊针对非幂等操作处理的RetryCache,通过RetryCache我们可以避免重复请求被处理。
非幂等操作重复处理的问题
这里重新回过头来说说非幂等操作被重复处理可能会导致的问题。
简单概括下来,会有以下几点潜在问题:
1)Application端因为接收到server返回的异常结果信息而导致application失败。因为非幂等类型重复请求第二次被server端时,可能会导致返回错误结果。比如重复执行创建文件请求时,第二次系统就会返回FileAlreadyExistException之类的错误。
2)Server端metadata信息的破坏。假设我们执行完create文件操作后,然后这个文件被相关任务read完随后被正常清理了,但是client的重试导致这个文件又被创建了一次,这就有可能引发metadata信息的损坏了。
3)Server HA failover切换时的metadata一致性问题。当服务在做HA failover切换和时候,服务主备切换是一个比较重的操作,failover切换期间是会出现client端请求无响应超时的情况的。这个时候可能部分请求被处理,部分实质上没有被处理,在服务主备切换后,为了保证server状态的完全一致性,我们需要利用RetryCache,来帮助server做重复的请求处理。当然,这里面需要新的active server做内部RetryCache的重建操作。
鉴于上述的问题,因此我们需要引入一个内部Cache来存放已执行过的请求调用结果,来防止非幂等操作的重复处理。在这里,我们称之为上述Cache为RetryCache。
RetryCache的实现细节
如果我们想实现一套完整的RetryCache,有哪些点需要重点去考虑的呢?
这里主要列出了以下几点:
- Client请求call的独立标识区分。目前RPC server内部一般会带有类似callId的概念来区分请求,但是单一callId还是无法区分出请求是否来自于同一个机器的client还是多个机器的client。在这里我们需要额外引入clientId的字段,以此组成<callId+clientId>的联合Id方式。
- 标记区分操作方法是否是幂等类型的还是非幂等类型的,我们只对后者类型请求结果进行RetryCache的存放。
- RetryCache内部每个Cache Entry不可能保证永久的存放,需要有过期时间的限制。
- RetryCache的信息持久化和重建过程的考虑,这个主要发生在HA服务做主从切换的时候。
RetryCache的实现样例
针对上面的实现细节,我们通过一个具体样例来进行更具化地了解,此样例取自于Hadoop 所使用的RetryCache类。
首先是Cache Entry的定义:
/**
* CacheEntry is tracked using unique client ID and callId of the RPC request
*/
public static class CacheEntry implements LightWeightCache.Entry {
/**
* Processing state of the requests
*/
private static byte INPROGRESS = 0;
private static byte SUCCESS = 1;
private static byte FAILED = 2;
/** 此entry代表的请求目前的状态,正在被处理,或者已经处理成功或失败*/
private byte state = INPROGRESS;
...
private final int callId;
private final long expirationTime;
private LightWeightGSet.LinkedElement next;
/**
* 一个全新的cache entry,它需要有clientId,callId以及过期时间.
*/
CacheEntry(byte[] clientId, int callId, long expirationTime) {
// ClientId must be a UUID - that is 16 octets.
Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
"Invalid clientId - length is " + clientId.length
+ " expected length " + ClientId.BYTE_LENGTH);
// Convert UUID bytes to two longs
clientIdMsb = ClientId.getMsb(clientId);
clientIdLsb = ClientId.getLsb(clientId);
this.callId = callId;
this.expirationTime = expirationTime;
}
...
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof CacheEntry)) {
return false;
}
CacheEntry other = (CacheEntry) obj;
// cache entry的equal通过callId和clientId联合比较,确保请求是来自重试操作的client
return callId == other.callId && clientIdMsb == other.clientIdMsb
&& clientIdLsb == other.clientIdLsb;
}
}
/**
* CacheEntry with payload that tracks the previous response or parts of
* previous response to be used for generating response for retried requests.
*/
public static class CacheEntryWithPayload extends CacheEntry {
// palyload简单理解为带了返回结果对象实例的RPC call
private Object payload;
CacheEntryWithPayload(byte[] clientId, int callId, Object payload,
long expirationTime) {
super(clientId, callId, expirationTime);
this.payload = payload;
}
下面是核心的RetryCache结果获取的方法调用:
*/
private CacheEntry waitForCompletion(CacheEntry newEntry) {
CacheEntry mapEntry = null;
lock.lock();
try {
// 1)从Cache中获取是否有对应Cache Entry
mapEntry = set.get(newEntry);
// 如果没有,则加入此entry到Cache中
if (mapEntry == null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding Rpc request clientId "
+ newEntry.clientIdMsb + newEntry.clientIdLsb + " callId "
+ newEntry.callId + " to retryCache");
}
set.put(newEntry);
retryCacheMetrics.incrCacheUpdated();
return newEntry;
} else {
retryCacheMetrics.incrCacheHit();
}
} finally {
lock.unlock();
}
// Entry already exists in cache. Wait for completion and return its state
Preconditions.checkNotNull(mapEntry,
"Entry from the cache should not be null");
// Wait for in progress request to complete
// 3)如果获取到了Cache Entry,如果状态是正在执行中的,则等待其结束
synchronized (mapEntry) {
while (mapEntry.state == CacheEntry.INPROGRESS) {
try {
mapEntry.wait();
} catch (InterruptedException ie) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
// Previous request has failed, the expectation is that it will be
// retried again.
if (mapEntry.state != CacheEntry.SUCCESS) {
mapEntry.state = CacheEntry.INPROGRESS;
}
}
// 4)Cache Entry对应的call已经结束,则返回之前cache的结果
return mapEntry;
}
我们接着来看实际RetryCache的调用场景:
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
// 1)从RetryCache中查询是否已经是执行过的RPC call调用
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
(retryCache, null);
// 2)如果有同一调用,并且是成功状态的,则返回上次payload的结果
// 否则进行后续处理操作的调用
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
boolean success = false;
long ret = 0;
try {
ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
success = true;
} finally {
// 3)操作完毕后,在RetryCache内部更新Entry的状态结果,
// 并设置payload对象(返回结果对象)
RetryCache.setState(cacheEntry, success, ret);
}
return ret;
}
以上更多的实现细节可参照下文引用链接代码。
引用
[1].https://issues.apache.org/jira/browse/HDFS-4979
[2].https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java