public void sendMultiAction(final List<Action<Row>> initialActions, Map<HRegionLocation, MultiAction<Row>> actionsByServer, final int numAttempt, final HConnectionManager.ServerErrorTracker errorsByServer) { // Send the queries and add them to the inProgress list // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) { final HRegionLocation loc = e.getKey(); final MultiAction<Row> multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), loc.getServerName()); //这里简单,就是将taskSent,taskPerRegion(默认最大值1),taskPerServer(默认最大值5)都进行了+1操作,以便之前的判断 Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { @Override public void run() { MultiResponse res; try { MultiServerCallable<Row> callable = createCallable(loc, multiAction); //原先的Callable没有了哦亲。MultiServerCallable,全新的Api哦亲 try { res = createCaller(callable).callWithoutRetries(callable); //这里是创建了一个RpcRetryingCaller,然后再调用callable,这里很奇怪,在构造的时候传入了callable,但是却没有使用,RpcRetryingCaller有一个估计MultiServerCallable的成员变量,后来因为某些原因干掉了。createCall方法很简单,主要是callWithoutRetries。备注【1】 } catch (IOException e) { LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt + ", resubmitting all since not sure where we are at", e); resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);//失败重试,备注【2】 return; } receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);//提交任务,备注【3】 } finally { decTaskCounters(multiAction.getRegions(), loc.getServerName()); // taskDone+1,taskPerRegion,taskPerServer-1 } } }); try { this.pool.submit(runnable);//HTable的pool跑到这里来了 } catch (RejectedExecutionException ree) { // This should never happen. But as the pool is provided by the end user, let's secure // this a little. decTaskCounters(multiAction.getRegions(), loc.getServerName()); LOG.warn("The task was rejected by the pool. This is unexpected." + " Server is " + loc.getServerName(), ree); // We're likely to fail again, but this will increment the attempt counter, so it will // finish. resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);//同上,失败重试 } } }
先看看失败重试吧,对应备注【2】
private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions, HRegionLocation location, int numAttempt, Throwable t, HConnectionManager.ServerErrorTracker errorsByServer) { // Do not use the exception for updating cache because it might be coming from // any of the regions in the MultiAction. // 先更新cache的region信息 hConnection.updateCachedLocations(tableName, rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location); errorsByServer.reportServerError(location);//保存异常 List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size()); for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) { for (Action<Row> action : e.getValue()) { if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(), true, t, location)) {//检查是否可以重试。这里备注下【4】 toReplay.add(action); } } } if (toReplay.isEmpty()) {//没有可重试的就88 LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " + initialActions.size() + " ops, NOT resubmitting, " + location.getServerName()); } else { submit(initialActions, toReplay, numAttempt, errorsByServer);//重试 } }
public void updateCachedLocations(final TableName tableName, byte[] rowkey, final Object exception, final HRegionLocation source) { if (rowkey == null || tableName == null) { LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + ", tableName=" + (tableName == null ? "null" : tableName)); return; } // Is it something we have already updated? final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey); if (oldLocation == null) { // There is no such location in the cache => it's been removed already => nothing to do return; } HRegionInfo regionInfo = oldLocation.getRegionInfo(); final RegionMovedException rme = RegionMovedException.find(exception); if (rme != null) { if (LOG.isTraceEnabled()){ LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " + rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort()); } //这里如果是region已经移动的信息,则异常会将新的region地址返回,然后更新缓存 updateCachedLocation( regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); } else if (RegionOpeningException.find(exception) != null) { //意思大概是region如果已经由别的RS打开,则client可以直接请求新的region,所以这里不用做处理 //不是很明白,求高手解答 if (LOG.isTraceEnabled()) { LOG.trace("Region " + regionInfo.getRegionNameAsString() + " is being opened on " + source.getHostnamePort() + "; not deleting the cache entry"); } } else { deleteCachedLocation(regionInfo, source);//真是不可恢复的错误就把缓存删了吧 } }
void reportServerError(HRegionLocation server) { ServerErrors errors = errorsByServer.get(server); if (errors != null) { errors.addError(); } else { errorsByServer.put(server, new ServerErrors()); } }
//老三样,按region归并action,然后提交。少了很多判断 private void submit(List<Action<Row>> initialActions, List<Action<Row>> currentActions, int numAttempt, final HConnectionManager.ServerErrorTracker errorsByServer) { // group per location => regions server final Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); for (Action<Row> action : currentActions) { HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex()); if (loc != null) { addAction(loc, action, actionsByServer); } } if (!actionsByServer.isEmpty()) { sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer); } }
这样重试的代码就理清楚了
看看RpcRetryingCaller.callWithoutRetries方法吧,主要是调用了prepare和call方法,
public T callWithoutRetries(RetryingCallable<T> callable) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { beforeCall(); callable.prepare(false);//这里是个坑哦亲,不管传入什么都一样哦,没有的参数啊亲,主要是返回一个stub,已经在之前的region定位篇中说过,这依赖protobuf库。 return callable.call(); } catch (Throwable t) { Throwable t2 = translateException(t); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } finally { afterCall(); } }
public void prepare(boolean reload) throws IOException { // Use the location we were given in the constructor rather than go look it up. setStub(getConnection().getClient(getLocation().getServerName())); }
call方法是核心,明天再看吧