Java + SpringBoot 操作 ElasticSearch7.x.x工具类RestHighLevelClientUtils

ElasticSearch创建索引,删除索引,判断 index 是否存在,根据 id 删除指定索引中的文档,
根据 id 更新指定索引中的文档,根据 id 更新指定索引中的文档,根据某字段的 k-v 更新索引中的文档,
添加文档 手动指定id,简单模糊匹配 默认分页为 0,10, term 查询 精准匹配,term 查询 精准匹配,返回列表,term 查询 ,返回列表,查询列表并高亮,批量导入,获取分词结果,返回聚合结果。

1、在application.yml配置如下内容

#数据源配置
spring:
  data:
    elasticsearch:
      client:
        reactive:
          endpoints: 127.0.0.1:9200
          connection-timeout: 10s
          username: xxxx
          password: xxxx
      #ES 持久化存储
      repositories:
        enabled: true
  elasticsearch:
    rest:
      # 要连接的ES客户端Rest Uri 多个逗号分隔
      uris: http://127.0.0.1:9200
      connection-timeout: 10s
      read-timeout: 30s
      username: xxxx
      password: xxxx

2、RestHighLevelClientUtils工具类

package cn.itbluebox.itblueboxesjd.utils;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.*;

//DeleteRequest GetRequest UpdateRequest 都是根据 id 操作文档

/**
 * @author anqi
 */
@Component
public class RestHighLevelClientUtils {
    
    

    @Resource
    private RestHighLevelClient client;

    /**
     * 创建索引
     * @param indexName
     * @param settings
     * @param mapping
     * @return
     * @throws IOException
     */
    public CreateIndexResponse createIndex(String indexName, String settings, String mapping) throws IOException{
    
    
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        if (null != settings && !"".equals(settings)) {
    
    
            request.settings(settings, XContentType.JSON);
        }
        if (null != mapping && !"".equals(mapping)) {
    
    
            request.mapping(mapping, XContentType.JSON);
        }
        return client.indices().create(request, RequestOptions.DEFAULT);
    }

    /**
     * 删除索引
     * @param indexNames
     * @return
     * @throws IOException
     */
    public AcknowledgedResponse deleteIndex(String ... indexNames) throws IOException{
    
    
        DeleteIndexRequest request = new DeleteIndexRequest(indexNames);
        return client.indices().delete(request, RequestOptions.DEFAULT);
    }


    /**
     * 判断 index 是否存在
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean indexExists(String indexName) throws IOException {
    
    
        GetIndexRequest request = new GetIndexRequest(indexName);
        return client.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 删除指定索引中的文档
     * @param indexName
     * @param id
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String indexName, String id) throws IOException{
    
    
        DeleteRequest request = new DeleteRequest(indexName, id);
        return client.delete(request, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 更新指定索引中的文档
     * @param indexName
     * @param id
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String indexName, String id, String updateJson) throws IOException{
    
    
        UpdateRequest request = new UpdateRequest(indexName, id);
        request.doc(XContentType.JSON, updateJson);
        return client.update(request, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 更新指定索引中的文档
     * @param indexName
     * @param id
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String indexName, String id, Map<String,Object> updateMap) throws IOException{
    
    
        UpdateRequest request = new UpdateRequest(indexName, id);
        request.doc(updateMap);

        return client.update(request, RequestOptions.DEFAULT);
    }

    private String generateCurrentData() {
    
    

        Long timeStamp = System.currentTimeMillis();  //获取当前时间戳
        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String sd = sdf.format(new Date(Long.parseLong(String.valueOf(timeStamp))));      // 时间戳转换成时间
        return sd;
    }

    /**
     * 根据某字段的 k-v 更新索引中的文档
     * @param fieldName
     * @param value
     * @param indexName
     * @throws IOException
     */
    public void updateByQuery(String fieldName, String value, String ... indexName) throws IOException {
    
    
        UpdateByQueryRequest request = new UpdateByQueryRequest(indexName);
        //单次处理文档数量
        request.setBatchSize(100)
                .setQuery(new TermQueryBuilder(fieldName, value))
                .setTimeout(TimeValue.timeValueMinutes(2));
        client.updateByQuery(request, RequestOptions.DEFAULT);
    }

    /**
     * 添加文档 手动指定id
     * @param indexName
     * @param id
     * @param source
     * @return
     * @throws IOException
     */
    public IndexResponse addDoc(String indexName, String id, String source) throws IOException{
    
    
        IndexRequest request = new IndexRequest(indexName);
        if (null != id) {
    
    
            request.id(id);
        }
        request.source(source, XContentType.JSON);
        return client.index(request, RequestOptions.DEFAULT);
    }

    /**
     * 添加文档 使用自动id
     * @param indexName
     * @param source
     * @return
     * @throws IOException
     */
    public IndexResponse addDoc(String indexName, String source) throws IOException{
    
    
        return addDoc(indexName, null, source);
    }

    /**
     * 简单模糊匹配 默认分页为 0,10
     * @param field
     * @param key
     * @param page
     * @param size
     * @param indexNames
     * @return
     * @throws IOException
     */
    public SearchResponse search(String field, String key, int page, int size, String ... indexNames) throws IOException{
    
    
        SearchRequest request = new SearchRequest(indexNames);
        SearchSourceBuilder builder = new SearchSourceBuilder();

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (StrUtil.isEmpty(key)){
    
    
            builder.query(QueryBuilders.matchAllQuery())
                    .from((page-1) * size)
                    .size(size);
        }else{
    
    
            BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
            boolQuery.should(QueryBuilders.matchQuery(field, key));
            boolQueryBuilder.must(boolQuery);
            builder.query(boolQueryBuilder)
                    .from((page-1) * size)
                    .size(size);
        }

        request.source(builder);
        return client.search(request, RequestOptions.DEFAULT);
    }

    /**
     * term 查询 精准匹配
     * @param field
     * @param key
     * @param page
     * @param size
     * @param indexNames
     * @return
     * @throws IOException
     */
    public SearchResponse termSearch(String field, String key, int page, int size, String ... indexNames) throws IOException{
    
    
        SearchRequest request = new SearchRequest(indexNames);
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(QueryBuilders.termsQuery(field, key))
                .from(page)
                .size(size);
        request.source(builder);
        return client.search(request, RequestOptions.DEFAULT);
    }

    /**
     * term 查询 精准匹配,返回列表
     * @param field
     * @param key
     * @param page
     * @param size
     * @param indexNames
     * @return
     * @throws IOException
     */
    public List<Map<String, Object>> termSearchList(String field, String key, int page, int size, String ... indexNames){
    
    
        try {
    
    

            SearchRequest request = new SearchRequest(indexNames);
            SearchSourceBuilder builder = new SearchSourceBuilder();
            builder.query(QueryBuilders.termsQuery(field, key))
                    .from(page)
                    .size(size);
            request.source(builder);
            SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
            SearchHits hits = searchResponse.getHits();
            // 获取检索结果总数
            List<Map<String, Object>> resultList=new ArrayList<>();
            for (SearchHit hit : hits.getHits()) {
    
    
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                resultList.add(sourceAsMap);
            }
            return resultList;
        }catch (Exception e){
    
    
            e.printStackTrace();
            throw new RuntimeException("查询失败");
        }

    }

    /**
     * term 查询 ,返回列表
     * @param indexNames
     * @return
     * @throws IOException
     */
    public List<Map<String, Object>> SearchList(SearchSourceBuilder builder, String ...indexNames){
    
    
        try {
    
    
            SearchRequest request = new SearchRequest(indexNames);
            request.source(builder);
            SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
            SearchHits hits = searchResponse.getHits();
            // 获取检索结果总数
            List<Map<String, Object>> resultList=new ArrayList<>();
            for (SearchHit hit : hits.getHits()) {
    
    
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                sourceAsMap.put("index",hit.getIndex());
                resultList.add(sourceAsMap);
            }
            return resultList;
        }catch (Exception e){
    
    
            e.printStackTrace();
            throw new RuntimeException("查询失败");
        }

    }

    /**
     * 查询列表并高亮
     * @param builder
     * @param indexNames
     * @return
     */
    public List<Map<String, Object>> SearchListWithHigh(SearchSourceBuilder builder, String ...indexNames){
    
    
        try {
    
    
            SearchRequest request = new SearchRequest(indexNames);
            request.source(builder);
            SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
            SearchHits hits = searchResponse.getHits();
            // 获取检索结果总数
            List<Map<String, Object>> resultList=new ArrayList<>();
            for (SearchHit hit : hits.getHits()) {
    
    
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                Map<String, HighlightField> highlightFieldMap = hit.getHighlightFields();
                // 获取高亮
                for (Map.Entry<String, HighlightField> entry : highlightFieldMap.entrySet()) {
    
    
                    Map<String, String> map = new HashMap<>();
                    Text[] texts = highlightFieldMap.get(entry.getKey()).getFragments();
                    String title = "";
                    for (Text text : texts) {
    
    
                        title += text;
                    }
                    map.put(entry.getKey(), title);
                    sourceAsMap.put(entry.getKey(), title);
                }
                sourceAsMap.put("index",hit.getIndex());
                resultList.add(sourceAsMap);
            }
            return resultList;
        }catch (Exception e){
    
    
            e.printStackTrace();
            throw new RuntimeException("查询失败");
        }

    }


    /**
     * 批量导入
     * @param indexName
     * @param isAutoId 使用自动id 还是使用传入对象的id
     * @param source
     * @return
     * @throws IOException
     */
    public BulkResponse importAll(String indexName, boolean isAutoId, String  source) throws IOException{
    
    
        if (0 == source.length()){
    
    
            //todo 抛出异常 导入数据为空
        }
        BulkRequest request = new BulkRequest();

        JSONArray array = JSON.parseArray(source);

        //todo 识别json数组
        if (isAutoId) {
    
    
            for (Object s : array) {
    
    
                request.add(new IndexRequest(indexName).source(s, XContentType.JSON));
            }
        } else {
    
    
            for (Object s : array) {
    
    
                request.add(new IndexRequest(indexName).id(JSONObject.parseObject(s.toString()).getString("id")).source(s, XContentType.JSON));
            }
        }
        return client.bulk(request, RequestOptions.DEFAULT);
    }

    /**
     * 获取分词结果
     * @param text
     * @return
     */
    public List<String> getAnalyze(String text){
    
    
        try{
    
    
            List<String> list = new ArrayList<>();
            Request request = new Request("GET", URLEncoder.encode("_analyze"));
            JSONObject entity = new JSONObject();
            entity.put("analyzer", "hanlp");
            entity.put("text", text);
            request.setJsonEntity(entity.toJSONString());
            Response response = client.getLowLevelClient().performRequest(request);
            JSONObject tokens = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
            JSONArray arrays = tokens.getJSONArray("tokens");
            for (int i = 0; i < arrays.size(); i++)
            {
    
    
                JSONObject obj = JSON.parseObject(arrays.getString(i));
                list.add(obj.getString("token"));
            }

            return list;
        }catch (Exception e){
    
    
            e.printStackTrace();
            throw new RuntimeException("获取分词结果失败");
        }
    }

    /**
     * 返回聚合结果
     * @param groupfield
     * @param indexNames
     * @return
     */
	public List<String> searchForGroupOne(String groupfield, String ... indexNames){
    
    
		try {
    
    
			SearchRequest request = new SearchRequest(indexNames);
			SearchSourceBuilder builder = new SearchSourceBuilder();
			TermsAggregationBuilder aggregation = AggregationBuilders.terms(groupfield).field(groupfield).size(100);
			builder.aggregation(aggregation);
			request.source(builder);
			SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
			//获取政策的聚合结果
			List<String> agglist=new ArrayList<>();
			//获取主分类-年份
			Terms terms = searchResponse.getAggregations().get(groupfield);
			if (terms != null) {
    
    
				for (Terms.Bucket bucket : terms.getBuckets()) {
    
    
					String the = (String) bucket.getKey();
					if (StrUtil.isNotBlank(the)) {
    
    
						agglist.add(the);
					}
				}
			}
			return agglist;
		}catch (Exception e){
    
    
			e.printStackTrace();
			throw new RuntimeException("查询失败");
		}
	}

	public List<String> searchForGroupOneAll(String groupfield, String ... indexNames){
    
    
		try {
    
    
			SearchRequest request = new SearchRequest(indexNames);
			SearchSourceBuilder builder = new SearchSourceBuilder();
			TermsAggregationBuilder aggregation = AggregationBuilders.terms(groupfield).field(groupfield);
			builder.aggregation(aggregation);
			request.source(builder);
			SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
			//获取政策的聚合结果
			List<String> agglist=new ArrayList<>();
			//获取主分类-年份
			Terms terms = searchResponse.getAggregations().get(groupfield);
			if (terms != null) {
    
    
				for (Terms.Bucket bucket : terms.getBuckets()) {
    
    
					String the = (String) bucket.getKey();
					if (StrUtil.isNotBlank(the)) {
    
    
						agglist.add(the);
					}
				}
			}
			return agglist;
		}catch (Exception e){
    
    
			e.printStackTrace();
			throw new RuntimeException("查询失败");
		}
	}
	public BulkByScrollResponse deleteByQuery(DeleteByQueryRequest request, RequestOptions aDefault)  {
    
    
		BulkByScrollResponse bulkByScrollResponse = null;
		try {
    
    
			bulkByScrollResponse = client.deleteByQuery(request, aDefault);
		}catch (Exception e){
    
    
			e.printStackTrace();
		}
		return bulkByScrollResponse;
	}
}

猜你喜欢

转载自blog.csdn.net/qq_44757034/article/details/127789644