/** * @MethodName: bulkCreateIndexByJSONArray * @Description: 使用bulk批量处理JSONArray创建索引导入文档,每BatchSize条文档执行一次bulk * @Param: * * @param client TransportClient对象实例 * @param jsonArray JSONArray格式的文档数据 * @param index 索引名 * @param type 索引类型 * @return: void * @Author: Young * @Date: 2018/4/11 */ public void bulkCreateIndexByJSONArray(TransportClient client, JSONArray jsonArray, String index, String type) { //插入索引量 int i = 0; //bulk单次批操作量 final int BatchSize = 1000; BulkRequestBuilder bulkRequest = client.prepareBulk(); //遍历JSONArray,数据量庞大时,for循环比foreach循环效率更高一些 for (i = 0; i < jsonArray.size(); i++) { //setSource为上传的文本文档 bulkRequest.add(client.prepareIndex(index,type).setSource(jsonArray.getJSONObject(i).toString())); //每100条数据执行一次bulk批量操作 if (0 == i % BatchSize) { bulkRequest.execute().actionGet(); System.out.println("当前已经插入:" + (i+1) + " 条索引..."); } } bulkRequest.execute().actionGet(); System.out.println("批量插入完成,共插入" + (i+1) + " 条索引"); } /** * @MethodName: deleteIndex * @Description: 通过id删除索引,删除索引名为idex,类型为type,id为id的文档 * @Param: * @param client TransportClient对象实例 * @param index 索引名 * @param type 索引类型 * @param id 索引id * @return: void * @Author: Young * @Date: 2018/4/11 */ public void deleteIndexByID(TransportClient client,String index,String type,String id){ DeleteResponse response = client.prepareDelete(index,type,id) .execute() .actionGet(); if(response.isFragment()){ System.out.println("索引: " + index + "删除成功!"); }else { System.out.println("删除失败!"); } } /** * @MethodName: deleteIndex * @Description: 删除index的整个索引库,操作不可逆,慎用 * @Param: * @param client TransportClient对象实例 * @param index 索引名 * @return: void * @Author: Young * @Date: 2018/4/11 */ public void deleteIndex(TransportClient client,String index){ //判断索引是否存在 if (isIndexExits(client,index)){ //索引存在,删除索引库 DeleteIndexResponse response = client.admin() .indices() .prepareDelete(index) .execute().actionGet(); if (response.isAcknowledged()){ System.out.println("索引库: " + index + "删除成功"); } else { System.out.println("删除失败!"); } } else { System.out.println("抱歉,索引不存在!"); } } /** * @MethodName: isIndexExits * @Description: 判断索引index是否存在 * @Param: * @param client TransportClient对象实例 * @param index 索引名 * @return: boolean * @Author: Young * @Date: 2018/4/11 */ public boolean isIndexExits(TransportClient client, String index){ //判断索引是否存在 IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(index); IndicesExistsResponse inExistsResponse = client.admin().indices() .exists(inExistsRequest).actionGet(); if (inExistsResponse.isExists()){ return true; } else { return false; } } /** * @MethodName: countIndex * @Description: 查询某一索引的文档总数 * @Param: * * @param client TransportClient对象实例 * @param index 索引名 * @return: long 索引的文档数,索引不存在则返回-1 * @Author: Young * @Date: 2018/4/11 */ public long countIndex(TransportClient client,String index){ if (isIndexExits(client,index)){ long count = client.prepareSearch(index) //查询该索引的所有文档 .setQuery(QueryBuilders.matchAllQuery()) //索引类型 //.setTypes(type) .get() .getHits() .getTotalHits(); return count; } else { System.out.println("抱歉,该索引不存在"); return -1; } } /** * @MethodName: elasticSearch * @Description: 查询,使用之前自行设置参数和查找方式、分词、内容参数等 * @Param: * 用户可自行添加id等参数 * @param index 索引名 * @param type 索引类型 * @return: void * @Author: Young * @Date: 2018/4/12 */ public void elasticSearch(String index,String type) throws UnknownHostException { //初始化客户端,连接集群 TransportClient client = initESClient(); //matchQuery搜索的时候,首先会解析查询的字符串,进行分词,然后查询 //如 "name":"Tom James",matchQuery会搜索name中含有Tom或James或Tom James的文档 //而termQuery,只会精确查询输入的查询内容,并不会解析查询内容进行分词 /** * term精确匹配 */ /*QueryBuilder queryBuilder = QueryBuilders.disMaxQuery() .add(QueryBuilders.termQuery("name","Tom James")) .add(QueryBuilders.termQuery("age","0 12"));*/ //Bool查询 QueryBuilder queryBuilder = QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery("name","Tom")) .must(QueryBuilders.matchQuery("age","20")); SearchResponse response = client.prepareSearch(index) .setTypes(type) //搜索类型 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(queryBuilder) //过滤,如过滤日期 //.setPostFilter(QueryBuilders.rangeQuery("datatime").gte("2018-1-1").lte("2018-4-12").format("yyy-MM-dd")) .setFrom(0).setSize(10).setExplain(true)//分页 //执行查询 .execute() .actionGet(); //查找到的结果数 long count = response.getHits().getTotalHits(); System.out.println("共查找到: " + count + "个条目"); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit: hits) { System.out.println(hit.getSource()); } } /** * @MethodName: search * @Description: 条件查询 * @Param: * * @param client TransportClient对象实例 * @param index 索引名 * @return: void * @Author: Young * @Date: 2018/4/11 */ public void search(TransportClient client,String index){ SearchResponse searchResponse = client.prepareSearch(index) //.setTypes(type) //查询所有 .setQuery(QueryBuilders.matchAllQuery()) //根据young分词查询name字段。默认or //.setQuery(QueryBuilders.matchQuery("name","young").operator(Operator.AND)) //指定查询的字段 //.setQuery(QueryBuilders.multiMatchQuery("tom", "name", "age")) //根据条件查询,支持通配符大于等于0小于等于19 //.setQuery(QueryBuilders.queryString("name:to* AND age:[0 TO 19]")) //查询时不分词 // .setQuery(QueryBuilders.termQuery("name", "tom")) .setSearchType(SearchType.QUERY_THEN_FETCH) //分页 .setFrom(0) //每个primary分片返回的文档数 .setSize(10) //根据age排序 .addSort("age",SortOrder.DESC) .get(); SearchHits hits = searchResponse.getHits(); long total = hits.getTotalHits(); System.out.println(total); SearchHit[] searchHits = hits.hits(); for(SearchHit s : searchHits) { System.out.println(s.getSourceAsString()); } } /** * @MethodName: moreIndexSearchs * @Description: 多索引,多类型查询 * @Param: * * @param client TransportClient对象实例 * @param index1 索引1 * @param index2 索引2 * @return: void * @Author: Young * @Date: 2018/4/11 */ public void moreIndexSearchs(TransportClient client,String index1,String index2) { SearchResponse searchResponse = client.prepareSearch(index1,index2) //.setTypes("stu","tea") .setQuery(QueryBuilders.matchAllQuery()) .setSearchType(SearchType.QUERY_THEN_FETCH) .get(); SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); System.out.println(totalHits); SearchHit[] hits2 = hits.getHits(); for(SearchHit h : hits2) { System.out.println(h.getSourceAsString()); } } /** * @MethodName: initBulkProcessor * @Description: 初始化bulk的批量导入处理器 * @Param: * @param client TransportClient对象实例 * @param esThreadNum 导入ES的并发数量 * @param batchSize 批量导入大小 * @return: org.elasticsearch.action.bulk.BulkProcessor * @Author: Young * @Date: 2018/4/11 */ public BulkProcessor initBulkProcessor(TransportClient client,int esThreadNum,int batchSize){ // 初始化Bulk处理器 BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { long begin; long cost; int count = 0; //bulk开始之前执行 public void beforeBulk(long executionId, BulkRequest bulkRequest) { System.out.println("****尝试添加" + bulkRequest.numberOfActions()+ "条索引****"); //logger.info("****尝试添加[{}]条索引****", bulkRequest.numberOfActions()); begin = System.currentTimeMillis(); } public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { cost = (System.currentTimeMillis() - begin) / 1000; count += bulkRequest.numberOfActions(); System.out.println("****添加: " + count +"条索引成功,耗时"+ cost +"s****"); //logger.info("****添加[{}]条索引成功****", count); //logger.info("bulk success. size:[{}] cost:[{}s]", count, cost); } //bulk出错时执行 public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) { System.out.println("****添加: " + count +"条索引失败,重新尝试****"); //logger.info("****添加[{}]条索引失败,重新尝试****", count); //logger.error("bulk update has failures, will retry:" + throwable); } }) // 批量导入个数(batchSize次请求执行一次bulk) .setBulkActions(batchSize) // 满100MB进行导入 .setBulkSize(new ByteSizeValue(100, ByteSizeUnit.MB)) //并发数,0表示不并发 .setConcurrentRequests(esThreadNum) // 冲刷间隔5s,每5s刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 重试3次,间隔1s(设置退避) .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) .build(); return bulkProcessor; } /** * @MethodName: readFile2JSONArray * @Description: 读取mysql导出的JSON文件,转化为JSON数组 * @Param: * * @param path 文件路径 * @return: com.alibaba.fastjson.JSONArray * @Author: Young * @Date: 2018/4/12 */ public JSONArray readFile2JSONArray(String path) throws IOException { //初始化 TransportClient client = initESClient(); //IO流读取文件,避免了内存泄漏,效率不高 File file = new File(path); String line = new String(); String tmp = new String(); LineIterator it = FileUtils.lineIterator(file, "UTF-8"); JSONArray jsonArray = new JSONArray(); //空两行 it.nextLine(); it.nextLine(); //按行读取 while (it.hasNext() && !"]".equals(tmp = it.nextLine())) { line += tmp; if (line.endsWith("},")){ String json = line.substring(0,line.length() - 1); System.out.println(json); //每读完一个JSON数据,存入数组 jsonArray.add(JSONObject.parse(json)); //放空字符串,准备读取下一个JSON数据 line = new String(); } } LineIterator.closeQuietly(it); return jsonArray; }
ES的Java API实现增删改查
猜你喜欢
转载自blog.csdn.net/kakaluoteyy/article/details/79932454
今日推荐
周排行