上一篇里演示了ES简单的条件查询,值得注意的是,因为ES的深分页问题,单次查询最多只能返回10000条数据,如果要查询超过1万条数据,那就要用到Scroll滚动查询
代码
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.*;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.joda.time.DateTime;
import java.io.*;
import java.text.NumberFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class ESReadTest {
// ES的用户名,密码,地址
private final String user = "user";
private final String password = "password";
private final String hostname = "127.0.0.1";
private String indexName;
private RestHighLevelClient client;
// 用来将北京时间转换成UTC时间
public String getUTCStr(String date) {
String returnDate = "";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
returnDate = new DateTime(sdf.parse(date)).plusHours(-8).toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
}
catch (ParseException e)
{
e.printStackTrace();
}
return returnDate;
}
// 初始化Client
public void iniES(String indexName){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(user, password));
RestClientBuilder builder = RestClient.builder(
new HttpHost(hostname, 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);}});
this.client = new RestHighLevelClient(builder);
this.indexName=indexName;
}
//构建请求
public List<String> dslBulider() throws IOException {
SearchRequest searchRequest = new SearchRequest(this.indexName);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// hit 返回值(bool 查询返回条数)
searchSourceBuilder.size(3000);
// searchSourceBuilder.from(0);
// 准确计数
searchSourceBuilder.trackTotalHits(true);
// 超时时间60s
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
// 绑定查询条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// status字段为301或302
boolQueryBuilder.must(QueryBuilders.termsQuery("status.keyword", new String[]{"301","302"}));
// args字段包含786754748671257
boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("args","786754748671257"));
// 时间大于等于2020-05-21 00:00:00,小于2020-05-22 00:00:00
boolQueryBuilder.must(QueryBuilders.rangeQuery("@timestamp").gte(getUTCStr("2020-05-21 00:00:00")).lt(getUTCStr("2020-05-22 00:00:00")));
// 绑定bool query
searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
// 开启scroll查询,设置scroll过期时间为1min
searchRequest.scroll("1m");
// 发起请求并接收响应
SearchResponse searchResponse = this.client.search(searchRequest, RequestOptions.DEFAULT);
// 初始化查询结果List
List<String> jsonStringList=new ArrayList<>();
// 获取第一页的查询结果
SearchHit[] searchHits=searchResponse.getHits().getHits();
for (SearchHit hit : searchHits) {
jsonStringList.add(hit.getSourceAsString());
}
// 获取ScrollId
String scrollId = searchResponse.getScrollId();
// 返回结果不为空则滚动查询
while (searchHits != null && searchHits.length > 0){
// 初始化scroll查询
SearchScrollRequest searchScrollRequest=new SearchScrollRequest(scrollId);
searchScrollRequest.scroll("1m");
// 发起请求并接收响应
searchResponse=this.client.searchScroll(searchScrollRequest,RequestOptions.DEFAULT);
// 更新ScrollId
scrollId=searchResponse.getScrollId();
// 更新查询结果
searchHits=searchResponse.getHits().getHits();
// 放入List
for (SearchHit hit : searchHits) {
jsonStringList.add(hit.getSourceAsString());
}
}
return jsonStringList;
}
// 关闭Client
public void closeES(){
try {
this.client.close();
}catch (Exception e){
e.printStackTrace();
}
}
// 运行
public void run(){
try {
// 初始化并传入index名
this.iniES("test*");
// 获取查询结果并打印
List<String> jsonStringList = this.dslBulider();
for (String json : jsonStringList) {
System.out.println(json);
}
}catch (Exception e){
e.printStackTrace();
}finally {
this.closeES();
}
}
public static void main(String[] args) {
ESReadTest esReadTest=new ESReadTest();
esReadTest.run();
}
}
可以看到,实际上第一次查询返回3000条数据,同时返回一个Scroll Id,之后使用这个Scroll Id循环查询直到返回结果为空,这样就能获得所有查询结果