版权声明:本文为博主原创文章,未经博主允许不得转载。否则切鸡鸡~~ https://blog.csdn.net/kang5789/article/details/77774299
完整代码下载:点击打开链接
1.Es :操作es的具体类
package org.ufo;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
public class Es {
// 连接
private static TransportClient client;
// 索引库名称,一般都是一个库,只是里边的type不同,比如user/goods
private static final String index = "testindex";
public Es() {
// 通过 setting对象来指定集群配置信息
Settings settings = Settings.builder()//
.put("client.transport.sniff", true)// 自动嗅探发现集群节点
.put("client.transport.ignore_cluster_name", true)// 忽略集群名称
.put("xpack.security.user", "elastic:changeme")// 安全认证
.build();
client = new PreBuiltXPackTransportClient(settings);
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.33.111"), 9300));
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.33.112"), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
/**
*
* @Description: 关闭连接
* @author kang
* @date 2017年5月11日
*/
public void close() {
client.close();
}
/**
*
* @Description: 验证链接是否正常
* @author kang
* @date 2017年5月11日
*/
public boolean validate() {
return client.connectedNodes().size() == 0 ? false : true;
}
/**
*
* @Description:添加文档
* @author kang
* @date 2017年1月3日
*/
public void addDoc(String type, Object id, Object object) {
client.prepareIndex(index, type, id.toString()).setSource(JSON.toJSONString(object)).get();
}
/**
*
* @Description:更新文档
* @author kang
* @date 2017年1月3日
*/
public void updateDoc(String type, Object id, Object object) {
client.prepareUpdate(index, type, id.toString()).setDoc(JSON.toJSONString(object)).get();
}
/**
*
* @Description:删除文档
* @author kang
* @date 2017年1月3日
*/
public void delDoc(String type, Object id) {
client.prepareDelete(index, type, id.toString()).get();
}
/**
*
* @Description: 分页高亮查询
* @author kang
* @date 2017年1月11日
*/
public Page getDocHighLight(String keywords, String type, Set<String> fields, int currentPage, int pageSize, boolean isHighlight) throws Exception {
// 搜索数据
SearchResponse response = client.prepareSearch(index).setTypes(type)//
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//
.setQuery(QueryBuilders.multiMatchQuery(keywords, fields.toArray(new String[fields.size()]))// 查询所有字段
.analyzer("ik_max_word"))// 分词器
.highlighter(new HighlightBuilder().preTags("<span style=\"color:red\">").postTags("</span>").field("*"))// 高亮标签
.setFrom((currentPage - 1) * pageSize).setSize(pageSize)// 分页
.setExplain(true)// 评分排序
.execute().actionGet();
// 获取查询结果集
SearchHits searchHits = response.getHits();
List<Object> result = Lists.newArrayList();
// 反射填充高亮
for (SearchHit hit : searchHits) {
Map<String, Object> source = hit.getSource();
if (isHighlight) {
// 获取对应的高亮域
Map<String, HighlightField> highlight = hit.getHighlightFields();
for (String field : fields) {
// 从设定的高亮域中取得指定域
HighlightField titleField = highlight.get(field);
if (titleField == null) continue;
// 取得定义的高亮标签
String texts = StringUtils.join(titleField.fragments());
source.put(field, texts);
}
}
result.add(JSON.toJSON(source));
}
return new Page(currentPage, pageSize, (int) searchHits.totalHits(), result);
}
/**
*
* @Description: 重构索引(更新词库之后)
* @author kang
* @date 2017年5月16日
*/
public void reindex() {
SearchResponse scrollResp = client.prepareSearch(index)//
.setScroll(new TimeValue(60000))//
.setQuery(QueryBuilders.matchAllQuery())//
.setSize(100).get(); // max of 100 hits will be returned for
// Scroll until no hits are returned
do {
for (SearchHit hit : scrollResp.getHits().getHits()) {
client.prepareIndex(index, hit.getType(), hit.getId()).setSource(hit.getSourceAsString()).execute().actionGet();
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while (scrollResp.getHits().getHits().length != 0);
}
}
2.EsFactory:Es的对象工厂
package org.ufo;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
//链接工厂,池子的一些方法由我们实现
public class EsFactory extends BasePooledObjectFactory<Es> {
// 创建对象
@Override
public Es create() throws Exception {
System.err.println("new es !!!!");
return new Es();
}
// 包装对象
@Override
public PooledObject<Es> wrap(Es arg0) {
return new DefaultPooledObject<Es>(arg0);
}
// 销毁对象关闭链接
@Override
public void destroyObject(PooledObject<Es> p) throws Exception {
p.getObject().close();
System.err.println("destory es");
super.destroyObject(p);
}
// 校验对象是否正常
@Override
public boolean validateObject(PooledObject<Es> p) {
System.err.println("validate es");
return p.getObject().validate();
}
}
3.EsUtil:实际使用类
package org.ufo;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
// es工具类
public class EsUtil {
// 初始化一个池子实例
private static GenericObjectPool<Es> pool;
static {
// 池子配置文件
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(10);// 整个池最大值
config.setMaxIdle(10);// 最大空闲
config.setMinIdle(2);// 最小空闲
config.setMaxWaitMillis(-1);// 最大等待时间,-1表示一直等
config.setBlockWhenExhausted(true);// 当对象池没有空闲对象时,新的获取对象的请求是否阻塞。true阻塞。默认值是true
config.setTestOnBorrow(false);// 在从对象池获取对象时是否检测对象有效,true是;默认值是false
config.setTestOnReturn(false);// 在向对象池中归还对象时是否检测对象有效,true是,默认值是false
config.setTestWhileIdle(true);// 在检测空闲对象线程检测到对象不需要移除时,是否检测对象的有效性。true是,默认值是false
config.setMinEvictableIdleTimeMillis(10 * 60000L); // 可发呆的时间,10mins
config.setTestWhileIdle(true); // 发呆过长移除的时候是否test一下先
pool = new GenericObjectPool<>(new EsFactory(), config);
}
/**
*
* @Description:添加文档
* @author kang
* @date 2017年1月3日
*/
public static void addDoc(String type, Object id, Object object) {
Es es = null;
try {
es = pool.borrowObject();
es.addDoc(type, id, object);
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.returnObject(es);
}
}
/**
*
* @Description:更新文档
* @author kang
* @date 2017年1月3日
*/
public static void updateDoc(String type, Object id, Object object) {
Es es = null;
try {
es = pool.borrowObject();
es.updateDoc(type, id, object);
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.returnObject(es);
}
}
/**
*
* @Description:删除文档
* @author kang
* @date 2017年1月3日
*/
public static void delDoc(String type, Object id) {
Es es = null;
try {
es = pool.borrowObject();
es.delDoc(type, id);
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.returnObject(es);
}
}
/**
*
* @Description: 分页高亮查询
* @author kang
* @date 2017年1月11日
*/
public static Page getDocHighLight(String keywords, String type, Set<String> fields, int currentPage, int pageSize, boolean isHighlight) {
Es es = null;
try {
es = pool.borrowObject();
return es.getDocHighLight(keywords, type, fields, currentPage, pageSize, isHighlight);
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.returnObject(es);
}
return null;
}
/**
*
* @Description:重构索引
* @author kang
* @date 2017年1月3日
*/
public static void reindex() {
Es es = null;
try {
es = pool.borrowObject();
es.reindex();
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.returnObject(es);
}
}
}
4.分页辅助类
package org.ufo;
import java.util.List;
public class Page {
// 指定的或是页面参数
private int currentPage; // 当前页
private int pageSize; // 每页显示多少条
// 查询数据库
private int recordCount; // 总记录数
private List<?> recordList; // 本页的数据列表
// 计算
private int pageCount; // 总页数
private int beginPageIndex; // 页码列表的开始索引(包含)
private int endPageIndex; // 页码列表的结束索引(包含)
/**
* 只接受前4个必要的属性,会自动的计算出其他3个属生的值
*
* @param currentPage
* @param pageSize
* @param recordCount
* @param recordList
*/
public Page(int currentPage, int pageSize, int recordCount, List<?> recordList) {
this.currentPage = currentPage;
this.pageSize = pageSize;
this.recordCount = recordCount;
this.recordList = recordList;
// 计算总页码
pageCount = (recordCount + pageSize - 1) / pageSize;
// 计算 beginPageIndex 和 endPageIndex
// >> 总页数不多于10页,则全部显示
if (pageCount <= 10) {
beginPageIndex = 1;
endPageIndex = pageCount;
}
// >> 总页数多于10页,则显示当前页附近的共10个页码
else {
// 当前页附近的共10个页码(前4个 + 当前页 + 后5个)
beginPageIndex = currentPage - 4;
endPageIndex = currentPage + 5;
// 当前面的页码不足4个时,则显示前10个页码
if (beginPageIndex < 1) {
beginPageIndex = 1;
endPageIndex = 10;
}
// 当后面的页码不足5个时,则显示后10个页码
if (endPageIndex > pageCount) {
endPageIndex = pageCount;
beginPageIndex = pageCount - 10 + 1;
}
}
}
public List<?> getRecordList() {
return recordList;
}
public void setRecordList(List<?> recordList) {
this.recordList = recordList;
}
public int getCurrentPage() {
return currentPage;
}
public void setCurrentPage(int currentPage) {
this.currentPage = currentPage;
}
public int getPageCount() {
return pageCount;
}
public void setPageCount(int pageCount) {
this.pageCount = pageCount;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public int getRecordCount() {
return recordCount;
}
public void setRecordCount(int recordCount) {
this.recordCount = recordCount;
}
public int getBeginPageIndex() {
return beginPageIndex;
}
public void setBeginPageIndex(int beginPageIndex) {
this.beginPageIndex = beginPageIndex;
}
public int getEndPageIndex() {
return endPageIndex;
}
public void setEndPageIndex(int endPageIndex) {
this.endPageIndex = endPageIndex;
}
}
5.测试
package org.ufo.test;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.ufo.EsUtil;
import org.ufo.Page;
import com.google.common.collect.Sets;
public class MyTest {
@Test
public void testgetone() throws Exception {
HashSet<String> set = Sets.newHashSet();
set.add("name");
set.add("telephone");
Page page = EsUtil.getDocHighLight("张", "user", set, 1, 10, true);
System.out.println(page.getRecordList());
}
// 并发测试
@Test
public void testget() throws Exception {
HashSet<String> set = Sets.newHashSet();
set.add("name");
ExecutorService pool = Executors.newFixedThreadPool(8);
for (int i = 0; i < 500; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
Page page = EsUtil.getDocHighLight("张", "user", set, 1, 10, true);
System.err.println("搜索成功条数为:" + page.getRecordCount());
}
});
}
pool.shutdown();
while (!pool.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
System.out.println("执行超时!!!!!!");
pool.shutdownNow();
}
}
@Test
public void testreindex() throws Exception {
EsUtil.reindex();
}
}