先吐槽几句,最近本博主一直在做数据平台的事,越发觉得做数据平台难,尤其数据量很大的情况下,然而一旦问题解决,又马上觉得峰回路转,蛮有成就感。
下面就介绍一下,在已经存有大量数据的ES索引中(博主处理的大概在1亿7千万条),向一个type中添加一个新字段并赋给一个值。
说明,cimissgcdb是index,agmedays是type。
首先来查看一下原始的mapping:
GET /cimissgcdb/agmedays/_mapping
查看一下该表数据量多大,心里有个底:
POST cimissgcdb/agmedays/_search/ { "query": { "match_all": {} } }
然后添加一个新字段,并设置字段的数据类型(ES的数据类型比如Integer、String、Double、Date等),博主这里新增了一个date类型的时间字段TimeFormat,并进行了format:
PUT cimissgcdb/_mapping/agmedays { "properties": { "TimeFormat": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" } } }
接下来给新字段赋值,有2种方式:
1. 使用painless
ES 5.0版本后推出painless。何为painless,推荐参考这2篇博文:
由于博主的ES用的还是低版本2.3.2的,所以第一种方法用不了,那只能使用第二种方式了。
2. 自己通过代码重写,由于博主的新字段值需要通过已经存在字段的值来计算,所以,自己动手写java api来实现。
public static void updateHourByScroll(String Type) throws IOException, ExecutionException, InterruptedException { System.out.println("scroll 模式启动!"); Date begin = new Date(); SearchResponse scrollResponse = client.prepareSearch(Index).setTypes(TYPE) .setSearchType(SearchType.SCAN).setSize(5000).setScroll(TimeValue.timeValueMinutes(1)) .execute().actionGet(); long count = scrollResponse.getHits().getTotalHits();//第一次不返回数据 for(int i=0,sum=0; sum<count; i++) { scrollResponse = client.prepareSearchScroll(scrollResponse.getScrollId()) .setScroll(TimeValue.timeValueMinutes(8)) .execute().actionGet(); sum += scrollResponse.getHits().hits().length; SearchHits searchHits = scrollResponse.getHits(); List<UpdateRequest> list = new ArrayList<UpdateRequest>(); for (SearchHit hit : searchHits) { String id = hit.getId(); Map<String, Object> source = hit.getSource(); if (source.containsKey("TimeFormat")) { //这个很重要,如果中间过程失败了,在执行时,起到过滤作用,提高效率。 System.out.println("TimeFormat已经存在!"); }else{ Integer year = Integer.valueOf(source.get("Year").toString()); Integer month = Integer.valueOf(source.get("Mon").toString()); Integer day = Integer.valueOf(source.get("Day").toString()); Integer hour = 0; if(source.containsKey(""Hour"")){ //处理Hour不存在的情况 hour = Integer.valueOf(source.get("Hour").toString()); }else{ hour = 0; } String time = getyear_month_day_hour(year, month, day, hour); //这个方法自定义,用来生成新字段TimeFormat的值,按需修改即可。 System.out.println(time); UpdateRequest uRequest = new UpdateRequest() .index(Index) .type(Type) .id(id) .doc(jsonBuilder().startObject().field("TimeFormat", time).endObject()); list.add(uRequest); //client.update(uRequest).get(); //注释上一行,就是单个提交,大数据量效率很低,用一个list来使用bulk,批量提高效率 } } // 批量执行 BulkRequestBuilder bulkRequest = client.prepareBulk(); for (UpdateRequest uprequest : list) { bulkRequest.add(uprequest); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { System.out.println("批量错误!"); } System.out.println("总量" + count + " 已经查到" + sum); } Date end = new Date(); System.out.println("耗时: "+(end.getTime()-begin.getTime())); }
上面方法就对1亿7千万条数据重新查询然后写了一遍,用到Scroll游标查询。
对于游标查询和分页查看文章:Elasticsearch分页查询From&Size VS scroll
这样就完成了对一个已经存在的ES表新增字段并赋值,但是切记,这样的操作只是对表已经存在的历史记录的改变,对于新来的数据,要修改ES写入新字段的过程。