编写不易,转载请注明( http://shihlei.iteye.com/blog/2339398)
一 前言
ES 做简单的条件查询,条件删除,在2.4版没有提供,script只提供的update的方案,自己简单封装了下。做面向对象的使用。
二 依赖
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.20</version> </dependency>
三 程序
连接客户端:
public class EsTcpClient { private static final String ES_HOST = "localhost"; private static final int ES_TCP_PORT = 9300; private static TransportClient client; /** * 获取TCP 客户端 * * @return */ public static synchronized TransportClient getClient() { if (client == null) { build(); } return client; } /** * 关闭客户端 */ public static void close(TransportClient client) { if (client != null) { client.close(); } } /** * 建立连接 * * @return */ private static void build() { try { //特别注意:如果cluster 起了名字,需要在连接时指定名字,否则验证客户端连接的不是默认集群elasticsearch,会忽略,则无法找到节点 Settings settings = Settings.settingsBuilder() .put("cluster.name", "mycluster").build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ES_HOST), ES_TCP_PORT)); // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); } catch (UnknownHostException e) { throw new RuntimeException(e); } } }
Template
public class EsTemplate<T extends EsTemplate.EsBean> { //TCP连接客户端: private TransportClient client = null; public EsTemplate(TransportClient client) { this.client = client; } /** * 新增 * * @param doc * @return */ public boolean insert(T doc) { String json = JSON.toJSONString(doc); IndexResponse response = client.prepareIndex(doc.esIndics(), doc.esType(), doc.esId()).setSource(json).get(); return response.isCreated(); } /** * 替换 * * @param doc * @return */ public boolean replace(T doc) { return update(doc); } /** * 更新 * * @param doc * @return */ public boolean update(T doc) { String json = JSON.toJSONString(doc); UpdateResponse response = client.prepareUpdate(doc.esIndics(), doc.esType(), doc.esId()) .setDoc(json) .get(); return !response.isCreated(); } /** * 删除 * * @param doc * @return */ public boolean delete(T doc) { DeleteResponse response = client.prepareDelete(doc.esIndics(), doc.esType(), doc.esId()).get(); return response.isFound(); } /** * 查询 * * @param indics 索引名 * @param type 类型 * @param id id * @param docClass 检索类型 * @return */ public T searchById(String indics, String type, String id, Class<T> docClass) { GetResponse response = client.prepareGet(indics, type, id).get(); if (response.isExists()) { String json = response.getSourceAsString(); return JSON.parseObject(json, docClass); } return null; } /** * 条件查询 * * @param criterias * @return */ public List<T> search(String indics, String type, Collection<Criteria> criterias, Class<T> docClass) { Function<SearchHit, T> resultConverter = searchHit -> { if (searchHit.isSourceEmpty()) { return null; } String json = searchHit.getSourceAsString(); return JSON.parseObject(json, docClass); }; return executeSearch(indics, type, criterias, true, resultConverter); } //批量部分------------------------------------------------------------------------------ /** * 批量插入更新 * * @param docs * @return */ public int batchInsert(Collection<T> docs) { if (docs == null || docs.isEmpty()) { return 0; } //构建批量插入 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (T doc : docs) { String json = JSON.toJSONString(doc); bulkRequestBuilder.add(client.prepareIndex(doc.esIndics(), doc.esType(), doc.esId()).setSource(json)); } }; executeBulk(buildRequestFunction); return docs.size(); } /** * 批量更新 * * @param docs * @return */ public int batchUpdate(Collection<T> docs) { if (docs == null || docs.isEmpty()) { return 0; } //构建批量更新 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (T doc : docs) { String json = JSON.toJSONString(doc); bulkRequestBuilder.add(client.prepareUpdate(doc.esIndics(), doc.esType(), doc.esId()) .setDoc(json)); } }; executeBulk(buildRequestFunction); return docs.size(); } /** * 批量删除 * * @param docs * @return */ public int batchDelete(Collection<T> docs) { if (docs == null || docs.isEmpty()) { return 0; } //构建批量更新 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (T doc : docs) { String json = JSON.toJSONString(doc); bulkRequestBuilder.add(client.prepareDelete(doc.esIndics(), doc.esType(), doc.esId())); } }; executeBulk(buildRequestFunction); return docs.size(); } /** * 条件更新 * * @param doc 文档 * @param criterias 查询条件 * @return 更新数量 */ public int updateByQuery(T doc, Collection<Criteria> criterias) { client.admin().indices().prepareRefresh().execute().actionGet(); final List<String> documentIds = executeSearch(doc.esIndics(), doc.esType(), criterias, false, searchHit -> searchHit.getId()); if (documentIds.isEmpty()) { return 0; } String json = JSON.toJSONString(doc); //构建批量更新 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (String id : documentIds) { UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(doc.esIndics(), doc.esType(), id) .setDoc(json); bulkRequestBuilder.add(updateRequestBuilder); } }; executeBulk(buildRequestFunction); return documentIds.size(); } /** * 条件删除 * * @param criterias * @return */ public int deleteByQuery(String indics, String type, Collection<Criteria> criterias) { client.admin().indices().prepareRefresh().execute().actionGet(); final List<String> documentIds = executeSearch(indics, type, criterias, false, searchHit -> searchHit.getId()); if (documentIds.isEmpty()) { return 0; } //构建批量删除 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (String id : documentIds) { bulkRequestBuilder.add(client.prepareDelete(indics, type, id)); } }; executeBulk(buildRequestFunction); return documentIds.size(); } /** * 执行批量操作 * * @param buildRequestFunction 构建处理 */ private void executeBulk(Consumer<BulkRequestBuilder> buildRequestFunction) { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); buildRequestFunction.accept(bulkRequestBuilder); BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (bulkResponse.hasFailures()) { throw new RuntimeException(bulkResponse.buildFailureMessage()); } } //执行查询 private <E> List<E> executeSearch(String indics, String type, Collection<Criteria> criterias, boolean needSource, Function<SearchHit, E> handleHitfunction) { List<E> results = new LinkedList<>(); //指定查询的库表 SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indics); searchRequestBuilder.setTypes(type); searchRequestBuilder.setSize(9999); if (!needSource) { searchRequestBuilder.addFields(); } if (criterias != null && !criterias.isEmpty()) { //构建查询条件必须嵌入filter中! BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); for (Criteria c : criterias) { boolQueryBuilder.filter(QueryBuilders.termQuery(c.getFieldName(), c.getFieldValue())); } searchRequestBuilder.setQuery(boolQueryBuilder); } //请求 SearchResponse searchResponse = searchRequestBuilder.get(); //响应 SearchHits hits = searchResponse.getHits(); //无查询结果 if (hits.totalHits() > 0) { SearchHit[] hitList = hits.getHits(); for (SearchHit searchHit : hitList) { E e = handleHitfunction.apply(searchHit); if (e != null) { results.add(e); } } } return results; } /** * 查询条件 */ public static class Criteria { private String fieldName; private Object fieldValue; public Criteria(String fieldName, Object fieldValue) { this.fieldName = fieldName; this.fieldValue = fieldValue; } public String getFieldName() { return fieldName; } public void setFieldName(String fieldName) { this.fieldName = fieldName; } public Object getFieldValue() { return fieldValue; } public void setFieldValue(Object fieldValue) { this.fieldValue = fieldValue; } } /** * ES操作Bean */ public static abstract class EsBean { @JSONField(serialize = false, deserialize = false) protected Collection<Criteria> criterias = new LinkedList<>(); public Collection<Criteria> getCriterias() { return criterias; } /** * 获取索引库名 * * @return 索引库名 */ public abstract String esIndics(); /** * 获取类型 * * @return 索引类型名 */ public abstract String esType(); /** * 获取doc id * * @return 文档id */ public abstract String esId(); /** * 添加操作条件 * * @param criteria 条件 */ public void addCriteria(Criteria criteria) { criterias.add(criteria); } } }
四 使用
public class EsTset { public static void main(String[] args) { TransportClient client = EsTcpClient.getClient(); EsTemplate<TestEsBean> testEsBeanEsTemplate = new EsTemplate<>(client); //插入 Collection<TestEsBean> testEsBeens = new LinkedList<>(); for(int i=0;i<10;i++){ testEsBeens.add(new TestEsBean(String.valueOf(i),"name"+i,i)); } testEsBeanEsTemplate.batchInsert(testEsBeens); //查询 Collection<TestEsBean> dbTestEsBeans = testEsBeanEsTemplate.search("testDb","testBean", Arrays.asList(new EsTemplate.Criteria("age",10)),TestEsBean.class); for(TestEsBean bean: dbTestEsBeans){ System.out.println(bean); } //删除 testEsBeanEsTemplate.deleteByQuery("testDb","testBean", Arrays.asList(new EsTemplate.Criteria("age",1))); } public static class TestEsBean extends EsTemplate.EsBean { private String id; private String name; private int age; public TestEsBean(String id, String name, int age) { this.id = id; this.name = name; this.age = age; } @Override public String toString() { return "TestEsBean{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", age=" + age + '}'; } /** * 获取索引库名 * * @return 索引库名 */ @Override public String esIndics() { return "testDb"; } /** * 获取类型 * * @return 索引类型名 */ @Override public String esType() { return "testBean"; } /** * 获取doc id * * @return 文档id */ @Override public String esId() { return id; } } }