elasticsearch-rest-high-level-client api
- 官方给的5.5.3的API并不支持高级API, 发现只有5.6以上的版本支持
- 官方文档位置
导入依赖
<properties>
<!-- 建议和es版本保持一致 -->
<elasticsearch.version>6.2.2</elasticsearch.version>
</properties>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
编写配置类
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
public class ESClientSpringFactory {
public static int CONNECT_TIMEOUT_MILLIS = 1000;
public static int SOCKET_TIMEOUT_MILLIS = 30000;
public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static int MAX_CONN_PER_ROUTE = 10;
public static int MAX_CONN_TOTAL = 30;
private static HttpHost[] HTTP_HOST;
private static ESClientSpringFactory esClientSpringFactory = new ESClientSpringFactory();
private RestClientBuilder builder;
private RestClient restClient;
private RestHighLevelClient restHighLevelClient;
private ESClientSpringFactory() {
}
public static ESClientSpringFactory build(HttpHost[] httpHost,
Integer maxConnectNum, Integer maxConnectPerRoute) {
HTTP_HOST = httpHost;
MAX_CONN_TOTAL = maxConnectNum;
MAX_CONN_PER_ROUTE = maxConnectPerRoute;
return esClientSpringFactory;
}
public static ESClientSpringFactory build(HttpHost[] httpHost, Integer connectTimeOut, Integer socketTimeOut,
Integer connectionRequestTime, Integer maxConnectNum, Integer maxConnectPerRoute) {
HTTP_HOST = httpHost;
CONNECT_TIMEOUT_MILLIS = connectTimeOut;
SOCKET_TIMEOUT_MILLIS = socketTimeOut;
CONNECTION_REQUEST_TIMEOUT_MILLIS = connectionRequestTime;
MAX_CONN_TOTAL = maxConnectNum;
MAX_CONN_PER_ROUTE = maxConnectPerRoute;
return esClientSpringFactory;
}
public void init() {
builder = RestClient.builder(HTTP_HOST);
setConnectTimeOutConfig();
setMutiConnectConfig();
restClient = builder.build();
restHighLevelClient = new RestHighLevelClient(builder);
System.out.println("init factory");
}
// 配置连接时间延时
public void setConnectTimeOutConfig() {
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
return requestConfigBuilder;
});
}
// 使用异步httpclient时设置并发连接数
public void setMutiConnectConfig() {
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
return httpClientBuilder;
});
}
public RestClient getClient() {
return restClient;
}
public RestHighLevelClient getRhlClient() {
return restHighLevelClient;
}
public void close() {
if (restClient != null) {
try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("close client");
}
}
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Slf4j
@Configuration
@ComponentScan(basePackageClasses = ESClientSpringFactory.class)
public class EsConfiguration {
@Value("${elasticsearch.hostlist}")
private String hostlist;
@Value("${elasticsearch.client.connectnum}")
private int connectNum;
@Value("${elasticsearch.client.connectPerRoute}")
private int connectPerRoute;
@Bean
public HttpHost[] httpHost() {
//解析hostlist配置信息
String[] split = hostlist.split(",");
//创建HttpHost数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
return httpHostArray;
}
@Bean(initMethod = "init", destroyMethod = "close")
public ESClientSpringFactory getFactory() {
return ESClientSpringFactory.
build(httpHost(), connectNum, connectPerRoute);
}
@Bean
@Scope("singleton")
public RestClient getRestClient() {
return getFactory().getClient();
}
@Bean
@Scope("singleton")
public RestHighLevelClient getRHLClient() {
return getFactory().getRhlClient();
}
}
在配置文件中添加es配置
spring:
datasource:
username: username
password: password
druid:
filters:
stat:
enabled: true
url: jdbc:mysql://localhost:3306/databases?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
mvc:
favicon:
enabled: false
elasticsearch:
hostlist: 192.168.0.0.1:9201,192.168.0.0.1:9202,192.168.0.0.1:9203
client:
connectnum: 10
connectPerRoute: 50
编写代码测试
- count 统计
// 初始化查询器 并制定 es 索引
SearchRequest request = new SearchRequest()
.indices(EsConsUtils.INDEX_SHUADAN).types(EsConsUtils.TYPE_SHUADAN);
// 初始化条件构造器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 指定返回 0 条记录
sourceBuilder.size(0);
// 定义普通查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("prId", 1688));
// 定义聚合查询
ValueCountAggregationBuilder terms_city = AggregationBuilders.count("count_city").field("id");
// 添加查询条件
sourceBuilder.query(boolQuery);
sourceBuilder.aggregation(terms_city);
request.source(sourceBuilder);
// 发起请求并获取返回值
try {
SearchResponse response = restHighLevelClient.search(request);
ValueCount count_city = response.getAggregations().get("count_city");
return count_city.getValue();
} catch (IOException e) {
e.printStackTrace();
}
- 对查询进行分组
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("proId", "520000"));
sourceBuilder.query(boolQuery);
TermsAggregationBuilder termsCity = AggregationBuilders.terms("terms_city").field("city.keyword");
sourceBuilder.aggregation(termsCity);
SearchRequest request = new SearchRequest().indices(EsConsUtils.INDEX_SHUADAN).types(EsConsUtils.TYPE_SHUADAN);
request.source(sourceBuilder);
SearchResponse response = restHighLevelClient.search(request);
ParsedStringTerms terms = response.getAggregations().get("terms_city");
JSONObject result = new JSONObject();
for (Terms.Bucket bucket : terms.getBuckets()) {
result.put(bucket.getKeyAsString(),bucket.getDocCount());
}
- 普通查询
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must(QueryBuilders.termQuery("provinceId", 410000));
sourceBuilder.query(boolQuery);
request.source(sourceBuilder);
List<SysLogES> esList = new ArrayList();
SearchResponse response = restHighLevelClient.search(request);
Arrays.stream(response.getHits().getHits())
.forEach(i -> {
String sourceAsString = i.getSourceAsString();
JSONObject resJson = (JSONObject) JSONObject.parse(sourceAsString);
SysLogES logES = resJson.toJavaObject(SysLogES.class);
esList.add(logES);
});
System.out.println(JSON.toJSONString(esList));