首先感谢 Bboss作者兼 elasticsearch交流 群主(一个非常热心的大佬)
这里直接看代码:
public Map<Integer, List<JestResult>> searchSlicedScrolls(MyYangBao yangBao, QueryBuilder queryBuilders, Set<String> includePatterns) { final int slicesMax = 5; ExecutorService singleThreadPool = UtilsJava.returnSingleThreadPool(slicesMax); Map<Integer, List<JestResult>> stringListMap = Collections.synchronizedMap(new HashMap<>(slicesMax)); //分成5个切片去执行 for (int id = 0; id < slicesMax; id++) { int finalId = id; ArrayList<JestResult> jestResults = new ArrayList<>(); singleThreadPool.submit(() -> { SliceBuilder sliceBuilder = new SliceBuilder(finalId, slicesMax); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .query(queryBuilders) // 设置 不排序 按照文档存入顺序返回 .sort("_doc") .slice(sliceBuilder); Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()) .addIndex(yangBao.getIndexName()) .addType(yangBao.getTypeName()) .setParameter(Parameters.SIZE, GlobalConstants.HITS_SIZE) .setParameter(Parameters.SCROLL, "5m") //可以屏蔽元数据 .setParameter("filter_path", "hits.hits,hits.hits._source,hits.hits._id,_scroll_id"); for (String includePattern : includePatterns) { builder.addSourceIncludePattern(includePattern); } Search search = builder.build(); JestResult jestResult = null; try { jestResult = yangBao.getJestClient().execute(search); } catch (IOException e) { e.printStackTrace(); } assert jestResult != null; if (jestResult.getJsonObject().getAsJsonObject(GlobalConstants.JSON_HITS).getAsJsonArray(GlobalConstants.JSON_HITS).size() > 0) { String scrollId = jestResult.getJsonObject().get(GlobalConstants.JSON_SCROLL_ID).getAsString(); jestResults.add(jestResult); do { SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m") .setParameter("filter_path", "hits.hits,hits.hits._source,hits.hits._id,_scroll_id") .build(); try { jestResult = yangBao.getJestClient().execute(scroll); } catch (IOException e) { e.printStackTrace(); } if (jestResult.getJsonObject().getAsJsonObject(GlobalConstants.JSON_HITS).getAsJsonArray(GlobalConstants.JSON_HITS).size() > 0) { jestResults.add(jestResult); } else { break; } } while (true); } }); stringListMap.put(finalId, jestResults); } singleThreadPool.shutdown(); try { singleThreadPool.awaitTermination(100, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return stringListMap; }
参考:https://my.oschina.net/bboss/blog/1788729