Elasticsearch的部分操作(单条插入批量插入及根据条件删除记录),转java代码的示例如下:
/** 批量插入多条记录到ES中(client即为RestHighLevelClient的一个实例,此处不在具体展示其创建过程)
* @param dataList
* @param indexName
*/
public void writeDataToES(List<Map<String, Object>> dataList, String indexName) {
// 将数据添加到 bulkProcessor 中
int i =0;
int lastOneIndex= dataList.size()-1;
LOG.info("--the datalist will be added to the ws, and it's size="+dataList.size());
for (Map<String, Object> rowMap : dataList) {
IndexRequest sourceRequest = new IndexRequest(indexName);
sourceRequest.source(rowMap);
bulkProcessor.add(sourceRequest);
if(i==lastOneIndex){
LOG.info("###success to add the last one source map in batch, it's mapInfo="+rowMap);
}
i++;
}
bulkProcessor.flush();
}
/** 单次插入一条记录到ES中(client即为RestHighLevelClient的一个实例,此处不在具体展示其创建过程).
* @param data
* @param indexName
* @throws IOException
*/
public void writeToES(Map<String, Object> data, String indexName) throws IOException {
// 将数据添加到 bulkProcessor 中
LOG.info("--the datalist will be added to the ws, it's mapInfo="+data);
IndexRequest sourceRequest = new IndexRequest(indexName);
sourceRequest.source(data);
client.index(sourceRequest, RequestOptions.DEFAULT);
}
//删除指定查询条件的记录.
public long deleteByTermQuery(String indexName, String termField, String termValue) {
//参数为索引名,可以不指定,可以一个,可以多个
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
// 更新时版本冲突
request.setConflicts("proceed");
// 设置查询条件,第一个参数是字段名,第二个参数是字段的值
request.setQuery(new TermQueryBuilder(termField, termValue));
// 更新最大文档数
request.setSize(2999);
// 批次大小
request.setBatchSize(50);
// 并行
request.setSlices(2);
// 使用滚动参数来控制“搜索上下文”存活的时间
request.setScroll(TimeValue.timeValueMinutes(10));
// 超时
request.setTimeout(TimeValue.timeValueMinutes(2));
// 刷新索引
request.setRefresh(true);
try {
BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
return response.getStatus().getUpdated();
} catch (IOException e) {
LOG.error("!!!deleteByTermQuery error", e);
}
return -1;
}