一、首先也是最重要的:(说几个注意事项):
1.由于是maven整合elasticsearch:::所以需要保证maven引入的版本和你的虚拟机或者是docker拉下来运行的版本大版本保持一致—例如:
我就出现了由于docker上安装的版本比较高导致客户端java代码报错的情况。所以我删除了docker内的镜像和容器。重新pull了一个7.8.0版本的elasticsearch
2.注意引入的maven包内的版本不之一的时候,在springboot的配置文件pom的properties标签内新增统一版本的elastic
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<nacos.version>2.2.5.RELEASE</nacos.version>
<elasticsearch.version>7.2.0</elasticsearch.version><!--新增-->
</properties>
二、配置
1.pom文件配置
<!--es搜索引擎-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.2.0</version>
<exclusions><!--由于引入的两个包有重复的引用,因此排除一个-->
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.2.0</version>
</dependency>
2.application-dev.yml内的参数配置–下面会使用
elasticsearch1:
address: 192.175.1.196:9200
3.ElasticSearch的config配置
package com.client.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author 25338
* @version 1.0
* @date 2021/11/24 15:16
* @description
*/
@Slf4j
@Configuration
public class ElasticSearchConfig {
@Value("${elasticsearch1.address}")
private String address;
/**
* 解析配置文件里的ip和port
* @return
*/
private HttpHost esHttpHost(){
String[] addresses = address.split(":");
int port = Integer.valueOf(addresses[1]);
return new HttpHost(addresses[0],port);
}
/**
* 构建客户端操作类
* @return
*/
@Bean
public RestClientBuilder restClientBuilder(){
return RestClient.builder(esHttpHost());
}
/**
* 单挑操作的时候得类注入
* @param restClientBuilder
* @return
*/
@Bean(name = "highLevelClient")
public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder){
return new RestHighLevelClient(restClientBuilder);
}
/**
* 配置多条同时操作时候的类注入
* @param restHighLevelClient
* @return
*/
@Bean(name = "bulkProcessor")
public BulkProcessor bulkProcessor(@Qualifier("highLevelClient") @Autowired RestHighLevelClient restHighLevelClient){
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
log.info("1.【before builprocessor】批次携带{}请求数量{}",l,bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
if(!bulkResponse.hasFailures()){
log.info("2.【after bulk-成功】bulkprocessor - 批量【{}】完成在{}秒!",l,bulkResponse.getIngestTookInMillis()/1000);
}else{
BulkItemResponse[] response = bulkResponse.getItems();
for (BulkItemResponse r:response) {
if(r.isFailed()){
log.info("2.【after bulk-失败】bulkprocessor - 批量【{}】失败,原因是{}!",l,r.getFailureMessage());
}
}
}
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
List<DocWriteRequest<?>> requestList = bulkRequest.requests();
List<String> esIds = requestList.stream().map(DocWriteRequest::id).collect(Collectors.toList());
log.error("3.【afterbulk-failed失败】es执行失败,失败的esid为:{}",esIds,throwable);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(((bulkreq,bulkresponselistener)->
restHighLevelClient.bulkAsync(bulkreq, RequestOptions.DEFAULT,bulkresponselistener)),listener);
//到达10000时候刷新
builder.setBulkActions(10000);
//内存达到8m时候刷新
builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
//设置时间间隔为10秒
builder.setFlushInterval(TimeValue.timeValueSeconds(10));
//设置允许的并发请求数目
builder.setConcurrentRequests(8);
//设置重试策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1),3));
// builder.setGlobalType("");
return builder.build();
}
}
4.具体操作
package com.client.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.client.entity.User;
import com.client.entity.response.Response;
import com.client.entity.response.ResponseLwLog;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
/**
* @author 25338
* @version 1.0
* @date 2021/4/19 15:01
* @description
*/
@Slf4j
@RequestMapping("/testEs")
@RestController
public class ElasticSearchController {
private final RestHighLevelClient restHighLevelClient;
private final BulkProcessor bulkProcessor;
@Autowired
public ElasticSearchController(@Qualifier("highLevelClient") RestHighLevelClient restHighLevelClient,
@Qualifier("bulkProcessor") BulkProcessor bulkProcessor) {
this.restHighLevelClient = restHighLevelClient;
this.bulkProcessor = bulkProcessor;
}
/**
* es的使用--保存--同步插入数据
* @return ---
*/
@GetMapping("/createEs")
public ResponseLwLog<String> createEs(){
//构建-indexRequest
IndexRequest indexRequest = new IndexRequest("people","test");
String json = JSON.toJSONString(new User(10,"小蓝",95));
//设置数据格式
indexRequest.source(json, XContentType.JSON);
indexRequest.id("people-1");
//1秒
indexRequest.timeout(TimeValue.timeValueSeconds(1));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//存在相同id时是覆盖还是报错
indexRequest.create(true);
indexRequest.id("12");
try {
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return Response.success("success");
}
@GetMapping("/esInsertAsync")
public void esInsertAsync(){
IndexRequest indexRequest = new IndexRequest("people");
indexRequest.source(JSON.toJSONString(new User(10,"小蓝",95)),XContentType.JSON);
indexRequest.timeout(TimeValue.timeValueSeconds(1));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//数据存数而不是更新//存在相同id时是覆盖还是报错
indexRequest.create(false);
indexRequest.id("19");
restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if(shardInfo.getFailed() > 0){
for (ReplicationResponse.ShardInfo.Failure sharef:shardInfo.getFailures()) {
log.error("将id为:{}存入ES时发生失败,原因是:{}",indexRequest.id(),sharef.getCause());
}
}
}
@Override
public void onFailure(Exception e) {
log.error("数据存入ES时发生异常:{},原因是:{}",indexRequest.id(),e);
}
});
}
/**
* 批量保存
*/
@GetMapping("/getEsData")
public void saveListEs(){
List<User> userList = Arrays.asList(new User(1,"小明",93),
new User(3,"小老",91), new User(7,"小子",91),
new User(119,"小路",92), new User(102,"小蓝",95),
new User(115,"小刚",96),new User(125,"小紫",196),
new User(5,"小小",960),new User(15,"李红",66));
//设置构建
List<IndexRequest> requestList = new ArrayList<>();
userList.forEach(e->{
IndexRequest indexRequest = new IndexRequest("people");
//id
indexRequest.id(e.getId()+"");
indexRequest.source(JSON.toJSONString(e),XContentType.JSON);
indexRequest.opType(DocWriteRequest.OpType.CREATE);
requestList.add(indexRequest);
});
requestList.forEach(bulkProcessor::add);
}
/**
* 更新数据--es
*/
@GetMapping("/updateEs")
public void updateDataEs() throws IOException {
UpdateRequest updateRequest = new UpdateRequest("people","1");
//通过map-属性-值进行更新
Map<String,String> map = new HashMap<>();
map.put("name","小明明");
updateRequest.doc(JSON.toJSONString(map),XContentType.JSON);
updateRequest.timeout(TimeValue.timeValueSeconds(3));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//数据存储不是更新
restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT);
}
/**
* 待条件的更新--es
*/
@GetMapping("/condUpdateEs")
public void updateEsCond() throws IOException {
UpdateByQueryRequest update = new UpdateByQueryRequest();
update.indices("people");
//根据ID进行操作
update.setQuery(new TermQueryBuilder("id","115"));
//设置
update.setScript(new Script(ScriptType.INLINE,
"painless",
"ctx._source.scope=100",
Collections.emptyMap()));
restHighLevelClient.updateByQuery(update,RequestOptions.DEFAULT);
}
/**
* es的批量更新
*/
@GetMapping("/attachEsUpdate")
public void attachUpdate(){
List<UpdateRequest> requestList = new ArrayList<>();
//更新的数据
List<User> updateUser = Arrays.asList(new User(3,"小老",91)
, new User(7,"小子",91));
//操作
updateUser.forEach(u->{
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("people");
//id
updateRequest.id(u.getId()+"");
//更新后的数据
Map<String,String> map = new HashMap<>();
map.put("scope","150");
updateRequest.doc(JSON.toJSONString(map),XContentType.JSON);
requestList.add(updateRequest);
});
//更新
requestList.forEach(bulkProcessor::add);
}
/**
* 单个删除数据--es
*/
@PostMapping("/delEs")
public void delEs() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.index("people");
deleteRequest.id("102");
restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
}
/**
* 按条件删除 -- es
*/
@PostMapping("/delEsCond")
public void selEsCond() throws IOException {
DeleteByQueryRequest delete = new DeleteByQueryRequest();
delete.indices("people");
delete.setQuery(new TermQueryBuilder("id",5));
//分词是删除
restHighLevelClient.deleteByQuery(delete,RequestOptions.DEFAULT);
}
/**
* 查询===es
* @return
*/
@GetMapping("/selEsInfo")
public ResponseLwLog<List<User>> queryEsList(){
//构建查询参数
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//返回字段
String[] include = {
"name","scope","id"};
String[] exclude = {
};
searchSourceBuilder.fetchSource(include,exclude);
//构建查询条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.termQuery("id",7)));
//时间查询范围
// searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("create_time").from(LocalDateTime.of(2021,11,22,12,12,30))));
// searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("create_time").to(LocalDateTime.now())));
//分页
Integer size = 50;
Integer start = 0;
if(start + size > 10000){
throw new RuntimeException("查询数量超过10000");
}
searchSourceBuilder.size(size);
searchSourceBuilder.from(start);
searchSourceBuilder.query(boolQueryBuilder);
//构建请求
SearchRequest request = new SearchRequest();
request.source(searchSourceBuilder);
//发起请求
SearchResponse response = null;
try {
response = restHighLevelClient.search(request,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
//解析返回数据
// long totalhit = response.getHits().getTotalHits().value;
log.info("shuju:{}",JSON.toJSONString(response));
List<Map<String,Object>> mapList = Lists.newArrayListWithCapacity(response.getHits().getHits().length);
Arrays.stream(response.getHits().getHits()).forEach(his->mapList.add(his.getSourceAsMap()));
List<User> userList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(mapList)){
userList = JSONArray.parseArray(JSON.toJSONString(mapList),User.class);
}
return Response.success(userList);
}
}
四、参考文献
https://www.jianshu.com/p/de838a665eec