Elasticsearch原理分析——GET流程
文章目录
ES的读取分为GET和Search两种操作,这两种读取操作有较大的差异,GET/MGET必须指定三元组:_index,_type, id。也就是说,根据文档ID从正排索引中获取内容。而Search不指定 _id ,根据关键词从倒排索引中获取内容。本章分析GET/MAGET过程,下一章分析Search过程。
一个GET请求的简单例子,如下:
curl -XGET http://127.0.0.1:9200/myindex/mytype/12
{
"_index": "myindex",
"_type": "mytype",
"_id": "12",
"_version": 1,
"found": true,
"_source": {
"address": "阜城路",
"province": "北京市",
"city": "北京市",
"district": "海淀区",
"title": "欧雅丽002"
}
}
1. 可选参数
与写请求相同,GET请求时可以在URI中设置一些可选参数,如下表所示:
参数 | 简介 |
---|---|
realtime | 默认为true。GET API 默认是实时的,不受索引刷新(refresh)频率设置的影响。如果文档已经更新,但还没有刷新,则GET API将会发出一个刷新调用,时文档可见。 |
source filtering | 默认请求下,GET API返回文档的全部内容。可以设置为false,不返回文档内容。同时可以使用_source_include和 _source_exclude过滤返回原始文档的部分字段。 |
stored Fields | 对于索引映射中store设置为true的字段,本选项用来指定返回哪些字段。 |
_source | 通过/{index}/{type}/{id}/_source的形式可以只返回原始文档内容,其他的 _id等元信息不返回。 |
routing | 自定义routing |
prefernce | 默认情况下,GET API 从分片的多个副本中随机选择一个,通过指定优先级(preference)可以选择从主分片读取,或者尝试从本地读取。 |
refresh | 默认为false,若设置为true。则可以在读取之前先执行刷新操作,这对写入速度有负面影响。 |
version | 如果GET API指定了版本号,那么当文档实际版本号与请求版本号不符,ES将返回409错误。 |
GetRequest getRequest = new GetRequest(index, type, id);
boolean realtime = getRequest.realtime();
System.out.println(realtime == true);
FetchSourceContext context = new FetchSourceContext(true,new String[]{
"aa"},new String[]{
"bb"});
getRequest.fetchSourceContext(context);
getRequest.routing("user_id");
getRequest.preference(Preference.PREFER_NODES.type());
getRequest.refresh(false);
getRequest.version(1);
2. GET基本流程
搜索和读取文档都属于读操作,可以从主分片或副本分片中读取数据。
读取当个文档的流程,如下下图:
这个例子中的索引有一个主分片和两个副分片。以下是从主分片或副分片中读取时的步骤:
- 客户端向NODE1发送读请求。
- NODE1使用文档ID确定文档属于分片0,通过集群状态中的内容路由信息表获知分片0有三个副本数据,位于三个节点中,此时它可以将请求发送到任意节点,这里它将请求转发到NODE2。
- NODE2将文档返回给NODE1,NODE1将文档返回给客户端。
NODE1作为协调节点,会将客户端请求轮询发送到集群的所有副本来实现负载均衡。
在读取时,文档可能已经存在于主分片上,但还没有复制到副本分片。在这种情况下,读请求命中副本分片时可能会报告文档不存在,但是命中主分片可能成功返回文档。一旦写请求成功返回给客户端,则意味着文档在主分片和副分片都是可用的。
3. GET详细分析
GET/MGET流程涉及两个节点:协调节点和数据节点,流程如下图所示:
3.1 协调节点
执行本流程的线程池:http_server_worker
。
TransportSingleShardAction
类用来处理存在于一个单个(主或副)分片上的请求,如果请求执行失败,则尝试转发到其他节点读取。在收到读请求后,处理过程如下。
3.1.1 内容路由
-
在
TransportSingleShardAction.AsyncSingleAction
构造函数中准备集群状态,节点列表等信息。 -
根据内容路由算法计算目标
shardid
,得到文档具体在哪个分片上。 -
计算出目标
shardid
后,结合请求参数中指定的优先级和集群状态确定目标节点,由于分片可能存在多个副本,因此计算出的是一个列表。private AsyncSingleAction(Request request, ActionListener<Response> listener) { this.listener = listener; //获取集群状态 ClusterState clusterState = clusterService.state(); if (logger.isTraceEnabled()) { logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version()); } //集群nodes列表 nodes = clusterState.nodes(); //校验集群状态,如果是红色的,抛出异常 ClusterBlockException blockException = checkGlobalBlock(clusterState); if (blockException != null) { throw blockException; } String concreteSingleIndex; if (resolveIndex(request)) { concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName(); } else { concreteSingleIndex = request.index(); } this.internalRequest = new InternalRequest(request, concreteSingleIndex); //解析请求,更新自定义routing resolveRequest(clusterState, internalRequest); blockException = checkRequestBlock(clusterState, internalRequest); if (blockException != null) { throw blockException; } //得到分片迭代器 this.shardIt = shards(clusterState, internalRequest); }
具体的路由算法参考写流程分析。
3.1.2 转发请求
作为协调节点,向目标节点转发请求,或者目标节点是本地节点,直接读取数据。发送函数声明了如何对Response
进行处理:
AsyncSingleAction
类中声明对Response
进行处理的函数。无论请求在本节点还是发送到其他节点,均对Response
执行相同的处理逻辑:
/**
* 处理响应
* @param currentFailure
*/
private void perform(@Nullable final Exception currentFailure) {
Exception lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null,
LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null,internalRequest.request()), failure);
}
listener.onFailure(failure);
return;
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
internalRequest.request().internalShardId = shardRouting.shardId();
if (logger.isTraceEnabled()) {
logger.trace(
"sending request [{}] to shard [{}] on node [{}]",
internalRequest.request(),
internalRequest.request().internalShardId,
node
);
}
final Writeable.Reader<Response> reader = getResponseReader();
transportService.sendRequest(node, transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
}
发送的具体过程:
-
在
TransportService::sendRequest
中检查目标是否是本地node。 -
如果是本地node,则进入
TransportService#sendLocalRequest
流程,sendLocalRequest
不发送到网络,直接根据action获取注册的reg,执行processMessageReceived
:private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool); try{ onRequestSent(localNode, requestId, action, request, options); onRequestReceived(requestId, action); //根据action获取注册的reg final RequestHandlerRegistry reg = getRequestHandler(action); } }
-
如果发送到网络,则请求被异步发送,
sendRequest
的时候注册handle
,等待处理Response
,直到超时。 -
等待数据节点的回复,如果数据解读处理成功,则返回给客户端;如果数据节点处理失败,则进行重试:
/** * 数据节点响应失败,进行重试 * @param shardRouting * @param e */ private void onFailure(ShardRouting shardRouting, Exception e) { if (e != null) { logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e); } perform(e); }
内容路由结束是构造了目标节点列表的迭代器,重试发送时,目标节点选择迭代器的下一个。
3.2 数据节点
执行本流程的线程池:get
。
数据节点接收协调节点请求的入口为:
TransportSingleShardAction.ShardTransportHandler#messageReceived
。
读取数据并组织成Response
,给客户channel返回:
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
}
}
TransportGetAction#shardOperation
先检查是否需要refresh
然后调用indexShard.getService().get()
读取数据并存储到GetResult
中。
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
//检查是否需要refresh
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
//shardGetService读取数据
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}
在ShardGetService#get()
函数中,调用:
private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
currentMetric.inc();
try {
long now = System.nanoTime();
//核心读取数据
GetResult getResult =
innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext, readFromTranslog);
if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
missingMetric.inc(System.nanoTime() - now);
}
return getResult;
} finally {
currentMetric.dec();
}
}
获取结果。GetResult
类用于存储读取的真实数据内容。核心的数据读取实现在ShardGetService#innerGet()
函数中:
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
//处理 _all
if (type == null || type.equals("_all")) {
DocumentMapper mapper = mapperService.documentMapper();
type = mapper == null ? null : mapper.type();
}
Engine.GetResult get = null;
if (type != null) {
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
// 调用 Engine 读取数据
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm) .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
if (get.exists() == false) {
get.close();
}
}
if (get == null || get.exists() == false) {
return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
try {
//过滤返回结果
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.close();
}
}
- 通过
indexShard.get()
获取Engine.GetResult
。Engine.GetResult
类与innerGet
返回的GetResult
是同名的类,但实现不同。indexShard.get()
最终调用InternalEngine#get
读取数据。 - 调用
ShardGetService#innerGetLoadFromStoreFields()
,根据type、id、DocumentMapper
等信息从刚刚获取的信息中获取数据,对指定的field 、source
进行过滤(source
过滤只支持对字段),把结果存储于GetResult
对象中。
3.3 InternalEngine的读取过程
InternalEngine#get
过程会加读锁。处理realtime
选项,如果为true,则先判断是否有数据可以刷盘,然后调用Searcher
进行读取数据。Searcher
是对IndexSearcher
的封装。
在早期ES版本中,如果开启(默认)realtime
,则会尝试从translog
中读取,刚刚写入不久的数据可以从translog
中读取:从ES 5.x开始不会从tanslog
中读取,只从Lucene读取。realtime
的实现机制变成靠refresh
实现。
@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
//处理realtime选项,判断是否需要刷盘
if (get.realtime()) {
VersionValue versionValue = null;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// versionMap中的值是写入索引的时候添加的,不会写刷盘
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
assert versionValue.seqNo >= 0 : versionValue;
//如果需要刷盘,执行刷盘操作
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}
// 调用Searcher读取数据
return getFromSearcher(get, searcherFactory, scope);
}
}
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
//最终提交的checkpoint小于版本号,执行刷磁盘操作
if (lastRefreshedCheckpoint() < requestingSeqNo) {
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL, true);
}
}
}
}
4. MGET流程分析
MGET的主要处理类:TransportMultiGetAction
,通过封装单个GET请求实现,处理流程如下:
主要流程如下:
-
遍历请求,计算出每个doc的路由信息,得到由shardid为key组成的request map。这个过程没有在TransportSingleShardAction中实现,是因为如果在那里实现,shardid就会重复,这也是合并为基于分片的请求的过程。
-
循环处理组织好的每个shard级请求,调用处理GET请求使用
TransportSingleShardAction#AsyncSingleAction
处理单个doc的流程。 -
收集
Response
,全部Response
返回后执行finishHim()
,给客户端返回结果。@Override protected void doExecute(Task task, final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) { ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); final AtomicArray<MultiGetItemResponse> responses = new AtomicArray<>(request.items.size()); final Map<ShardId, MultiGetShardRequest> shardRequests = new HashMap<>(); //遍历iterms for (int i = 0; i < request.items.size(); i++) { MultiGetRequest.Item item = request.items.get(i); String concreteSingleIndex; try { concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, item).getName(); item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index())); if ((item.routing() == null) && (clusterState.getMetaData().routingRequired(concreteSingleIndex))) { responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(), new RoutingMissingException(concreteSingleIndex, item.type(), item.id()))); continue; } } catch (Exception e) { responses.set(i, newItemFailure(item.index(), item.type(), item.id(), e)); continue; } //计算shadId ShardId shardId = clusterService.operationRouting() .getShards(clusterState, concreteSingleIndex, item.id(), item.routing(), null) .shardId(); MultiGetShardRequest shardRequest = shardRequests.get(shardId); if (shardRequest == null) { shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId()); shardRequests.put(shardId, shardRequest); } shardRequest.add(i, item); } if (shardRequests.isEmpty()) { // only failures.. listener.onResponse(new MultiGetResponse(responses.toArray(new MultiGetItemResponse[responses.length()]))); } //执行Get操作 executeShardAction(listener, responses, shardRequests); }
回复的消息中文档顺序与请求顺序一致。如果部分文档读取失败,则不影响其他结果,检索失败的doc会在回复信息中标出。
5. 思考
我们需要警惕实时读取特性,GET API默认是实时的,实时的意思是写完了可以立刻读取,但仅限于GET、MGET操作,不包括搜索。在5.x版本之前,GET/MGET的实时读取依赖于从tasnslog中读取实现,5.x版本之后的版本改为refresh,因此系统对实时读取的支持会对写入速度有负面影响。
由此引出另一个较深层次的问题是,update操作需要先GET再写,为了保证一致性,update调用GET时将realtime选项设置为true,并且不可配置。因此update操作可能会导致refresh生成新的Lucene分段。
读失败时怎么处理的?
尝试从别的分片副本读取。
优先级?
优先级策略只是将匹配到优先级的节点放到了目标节点列表的前面。
6. 关注我
搜索微信公众号:java架构强者之路