RegionServer端put数据流程分析:
client端通过MultiServerCallable.call调用rs的rpc的multi方法。
regionServer实例ClientProtos.ClientService.BlockingInterface接口。
public MultiResponse multi(finalRpcControllerrpcc, final MultiRequest request)
throws ServiceException {
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScannercellScanner = controller != null? controller.cellScanner(): null;
if (controller != null) controller.setCellScanner(null);
List<CellScannable> cellsToReturn = null;
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
得到当前提交的数据,数据按region,list的方式传入过来。
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
regionActionResultBuilder.clear();
HRegion region;
try {
从当前regionserver中的onlnieRegions中得到请求的region.
1.从onlineRegions中取出HRegion实例,如果不为空,按如下流程走,否则:执行到5
2.如果onlineRegions列表中不包含此region,从movedRegions列表中拿到region,region的moved超时是2分钟,
如果movedRegions列表中能拿到此region,同时move时间超时,并从movedRegions列表中移出引region返回null,
否则返回正在moved的region,如果movedRegions中返回的region不为null,throw RegionMovedException
3.从regionsInTransitionInRS中获取此region,如果能拿到,同时拿到的值为true,表示region还在做opening操作。
Throw RegionOpeningException
4.如果以上得到的值都为null,表示此server中没有此region, throw NotServingRegionException
此时基本上只有一个可能,region在做split.或者move到其它server(刚完成move,client请求时不在此server)
5.如果1中拿到region,表示正常,region在此server中。
region = getRegion(regionAction.getRegion());
} catch (IOException e) {
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
continue; // For this region it's a failure.
}
检查是否是原子操作:
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
// How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO.
Try {
执行原子操作:保证一个region中所有的action的操作的mvcc的值相同,如果有一个操作失败,整体rollback
mutateRows(region, regionAction.getActionList(), cellScanner);
} catch (IOException e) {
// As it's atomic, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
} else {
// doNonAtomicRegionMutation manages the exception internally
执行非原子操作:见doNonAtomicRegionMutation流程分析
cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
regionActionResultBuilder, cellsToReturn);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
}
// Load the controller with the Cells to return.
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。
同时response要返回的result只返回result的大小
如果是批量的get/append/increment操作时,建议是把codec的配置设置上。
此时表示是get/append/increment的请求,生成一个CellScanner实例,此实时是查询得到的所有Result
clent通过此CellScanner来获取数据。
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
returnresponseBuilder.build();
}
doNonAtomicRegionMutation处理流程分析:
用于处理非原子性的put/delete/get操作。
privateList<CellScannable> doNonAtomicRegionMutation(final HRegion region,
final RegionAction actions, finalCellScannercellScanner,
final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<ClientProtos.Action> mutations = null;
对request中指定region中所有的action进行迭代
for (ClientProtos.Action action: actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
如果此action是get操作,直接执行get
if (action.hasGet()) {
Get get = ProtobufUtil.toGet(action.getGet());
r = region.get(get);
} elseif (action.hasMutation()) {
MutationTypetype = action.getMutation().getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
doBatchOp(builder, region, mutations, cellScanner);
mutations.clear();
}
switch (type) {
如果kv中存在原来的数据,把新kv的数据添加到old kv的后面,kv的大小等于oldkvsize+newkvsize
如果是新的kv,直接当成一列进行存储
caseAPPEND:
r = append(region, action.getMutation(), cellScanner);
break;
值自动增加。
caseINCREMENT:
r = increment(region, action.getMutation(), cellScanner);
break;
如果操作是put/delete,把所有的操作action集合在一起,
casePUT:
caseDELETE:
// Collect the individual mutations and apply in a batch
if (mutations == null) {
mutations = newArrayList<ClientProtos.Action>(actions.getActionCount());
}
mutations.add(action);
break;
default:
thrownewDoNotRetryIOException("Unsupported mutate type: " + type.name());
}
} else {
thrownewHBaseIOException("Unexpected Action type");
}
如果是get/append/increment操作,每一次都会得到一个result,把result添加到返回的response中。
if (r != null) {
ClientProtos.Result pbResult = null;
如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。
同时response要返回的result只返回result的大小
如果是批量的get/append/increment操作时,建议是把codec的配置设置上。
if (isClientCellBlockSupport()) {
pbResult = ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
if (cellsToReturn == null) cellsToReturn = newArrayList<CellScannable>();
cellsToReturn.add(r);
} else {
如果没有配置codec时,把result的所有数据写入到response中,当成client request的响应信息,
这样如果批量的值比较大时,可能会影响到响应的速度。
pbResult = ProtobufUtil.toResult(r);
}
resultOrExceptionBuilder =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
// Could get to here and there was no result and no exception. Presumes we added
// a Put or Delete to the collecting Mutations List for adding later. In this
// case the corresponding ResultOrException instance for the Put or Delete will be added
// down in the doBatchOp method call rather than up here.
} catch (IOException ie) {
resultOrExceptionBuilder = ResultOrException.newBuilder().
setException(ResponseConverter.buildException(ie));
}
if (resultOrExceptionBuilder != null) {
// Propagate index.
把第一个子的响应信息添加到集合列表中,等待执行完成统一进行响应。
resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build());
}
}
// Finish up any outstanding mutations
针对put/delete操作,执行批量操作。
if (mutations != null && !mutations.isEmpty()) {
doBatchOp(builder, region, mutations, cellScanner);
}
returncellsToReturn;
}
针对put/delete操作的批量操作处理方法
protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
finalList<ClientProtos.Action> mutations, finalCellScannercells) {
Mutation[] mArray = newMutation[mutations.size()];
longbefore = EnvironmentEdgeManager.currentTimeMillis();
booleanbatchContainsPuts = false, batchContainsDelete = false;
try {
inti = 0;
迭代要操作的所有action,生成put/delete的mutation实例
for (ClientProtos.Action action: mutations) {
MutationProto m = action.getMutation();
Mutationmutation;
if (m.getMutateType() == MutationType.PUT) {
mutation = ProtobufUtil.toPut(m, cells);
batchContainsPuts = true;
} else {
mutation = ProtobufUtil.toDelete(m, cells);
batchContainsDelete = true;
}
mArray[i++] = mutation;
}
requestCount.add(mutations.size());
如果不是metatable的写入,也就是用户表写入,
if (!region.getRegionInfo().isMetaTable()) {
检查并等待全局flush的完成。
1.检查全局的flush是否超过hbase.regionserver.global.memstore.upperLimit配置的值,默认是0.4
如果当前rs中所有的memstore的size总和超过了此值,强制进行flsuh,并等待flush完成。线程wait,
等待MemStoreFlusher.flushRegion去notify此线程的等待。
2.检查全局的flush是否超过hbase.regionserver.global.memstore.lowerLimit配置,默认为0.35
如果rs中所有的memstore的size总和超过了此值,发起flush请求,不等待flush完成,执行下面流程
cacheFlusher.reclaimMemStoreMemory();
}
通过HRegion执行更新操作。
OperationStatus codes[] = region.batchMutate(mArray, false);
处理更新后每一条数据是否成功的信息,并添加到response中。
for (i = 0; i < codes.length; i++) {
intindex = mutations.get(i).getIndex();
Exception e = null;
switch (codes[i].getOperationStatusCode()) {
caseBAD_FAMILY:
出现这种情况表示要更新的action中指定的cf不存在或cf的name为null,
e = newNoSuchColumnFamilyException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
caseSANITY_CHECK_FAILURE:
出现这种情况表示要更新的action中有kv的timestamp的值超出了当前rs中的时间,
通过hbase.hregion.keyvalue.timestamp.slop.millisecs配置可超出的时间范围,默认为不控制
e = newFailedSanityCheckException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
default:
e = newDoNotRetryIOException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
caseSUCCESS:
正常结束
builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
break;
}
}
} catch (IOException ie) {
for (inti = 0; i < mutations.size(); i++) {
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
}
}
更新监控数据
longafter = EnvironmentEdgeManager.currentTimeMillis();
if (batchContainsPuts) {
metricsRegionServer.updatePut(after - before);
}
if (batchContainsDelete) {
metricsRegionServer.updateDelete(after - before);
}
}
Hregion.batchMutate:
OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)
throws IOException {
初始化批量执行处理程序,把每一个action的status设置为OperationStatus.NOT_RUN
BatchOperationInProgress<Mutation> batchOp =
newBatchOperationInProgress<Mutation>(mutations);
booleaninitialized = false;
如果处理还没有结束,一直迭代,
Hregion.BatchOperationInProgress.nextIndexToProcess==要处理的action个数表示完成处理
while (!batchOp.isDone()) {
if (!isReplay) {
checkReadOnly();
}
checkResources();
longnewSize;
if (isReplay) {
startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
} else {
startRegionOperation(Operation.BATCH_MUTATE);
}
try {
if (!initialized) {
if (!isReplay) {
region请求加一,
this.writeRequestsCount.increment();
迭代每一个action,执行cp的prePut/preDelete操作。
doPreMutationHook(batchOp);
}
initialized = true;
}
执行更新操作,
检查cf是否合法,如果是put,检查put传入的cf是否在table中存在,不存在此action的status为BAD_FAMILY
检查put的timestamp是否合法,需要在rs当前时间的一个合理范围内。
不在范围内此action.status为SANITY_CHECK_FAILURE
检查delete的cf是否合法。
如果是put的action,更新put的所有kv中timestamp为当前的rs时间
如果是delete的action,通过get先读取每一个cf中的数据,检查是否需要执行删除,
删除的timestamp为Long.MAX_VALUE
在以上检查完成后,如果部分检查不合法的action,它们的状态为非OperationStatusCode.NOT_RUN状态
把所有现在是NOT_RUN状态的action添加到对应的cf的memstore中。
每一个store中kv的MvccVersion的值为mvcc中memstoreWrite的值(region max seqid + 1)
根据更新的action中所有的store(cf),分别调用region中不同的store的add(HStore.add)方法添加到memstore中,
一个action中不同的store中所有的kv的mvccversion的都相同。
把更新过的每一个action的status的状态设置为OperationStatus.SUCCESS
写入wal日志,通过append方式添加日志,
日志的flush通过hbase.regionserver.optionallogflushinterval进行配置,默认为1*1000ms,-1表示实时更新
或可以通过table定义的时候设置DURABILITY属性,可设置为SYNC_WAL/FSYNC_WAL表示实时更新日志
得到当此添加的所的kv的总大小。
longaddedSize = doMiniBatchMutation(batchOp, isReplay);
把当前更新的size添加到rs中的全局memstore的大小,atomicGlobalMemstoreSize
把当前更新的size添加到当前region的memstore中。memstoreSize
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
} finally {
closeRegionOperation();
}
检查当前region中memstore的大小是否超过hbase.hregion.memstore.flush.size配置的大小,默认1024*1024*128L
如果需要flush,通过MemStoreFlusher.requestFlush(HRegion)发起flush请求
if (isFlushSize(newSize)) {
requestFlush();
}
}
returnbatchOp.retCodeDetails;
}