elasticsearch实现商品搜索
数据初始化
在项目启动时对未初始化的索引进行数据的初始化。
使用restHighLevelClient操作es,注意使用高版本es,restHighLevelClient对低版本es支持不好。
在项目启动时使用springboot的application做数据初始化
实例化客户端
//创建低级客户端 ip,端口,请求方式http或https
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostName, Integer.parseInt(port), scheme));
//创建高级客户端
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
判断索引是否存在
//判断商品索引是否存在
boolean zeastProductExists;
try {
GetIndexRequest zeastProductExistsRequest = new GetIndexRequest();
zeastProductExistsRequest.indices(zeastProductIndex);
zeastProductExists = restHighLevelClient.indices().exists(zeastProductExistsRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
logger.error("ES索引判断异常!",e);
throw new RuntimeException("ES索引判断异常!");
}
索引不存在时进行索引初始化
创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(zeastProductIndex);
//创建索引
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
boolean createIndexResult = createIndexResponse.isAcknowledged();
if(!createIndexResult){
logger.error("ES商品索引初始化失败!");
return CommonResult.failed("ES商品索引初始化失败!");
}
boolean shardsCreateIndexResult = createIndexResponse.isShardsAcknowledged();
if(!shardsCreateIndexResult){
logger.error("ES商品索引初始化失败!");
return CommonResult.failed("ES商品索引初始化失败!");
}
索引创建完成后设置索引mapping
mapping设置
PutMappingRequest putMappingRequest = new PutMappingRequest(zeastProductIndex);
//设置type
putMappingRequest.type(zeastProductType);
Map<String, Object> jsonMap = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
Map<String, Object> productId = new HashMap<>();
productId.put("type", "keyword");
properties.put("productId", productId);
Map<String, Object> productName = new HashMap<>();
productName.put("type", "keyword");
properties.put("productName", productName);
//keyword不分词 text分词
Map<String, Object> keyword = new HashMap<>();
keyword.put("type", "text");
properties.put("keyword", keyword);
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
jsonMap.put(zeastProductType, mapping);
putMappingRequest.source(jsonMap);
putMappingRequest.timeout(TimeValue.timeValueMinutes(2));
putMappingRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1));
AcknowledgedResponse putMappingResponse = restHighLevelClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
boolean mappingResult = putMappingResponse.isAcknowledged();
if(!mappingResult){
logger.error("ES商品映射初始化失败!");
return CommonResult.failed("ES商品映射初始化失败!");
}
mapping设置完成后开始全量数据初始化
数据初始化
查询数据部分省略,初始化代码大概如下,数据组装多开了几个线程同时跑。
BulkRequest request = new BulkRequest();
//在任务执行完成后解析返回值,
for(Future<List<SearchProductDo>> future :futures){
try {
List<SearchProductDo> data = future.get();
if(null != data && data.size()>0){
for (SearchProductDo searchProductDo:data) {
request.add(new IndexRequest(zeastProductIndex).id(String.valueOf(searchProductDo.getProductId()))
.source(JSON.toJSONString(searchProductDo), XContentType.JSON));
}
}
} catch (InterruptedException e) {
logger.error("获取多线程结果异常:",e);
return CommonResult.failed("获取多线程结果异常:"+e.getMessage());
} catch (ExecutionException e) {
logger.error("获取多线程结果异常:",e);
return CommonResult.failed("获取多线程结果异常:"+e.getMessage());
}
}
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
//bulkItemResponse.getOpType() INDEX,CREATE,UPDATE,DELETE
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
ReplicationResponse.ShardInfo shardInfo = itemResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//处理成功分片数量少于总分片数量的情况
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
logger.error("es商品数据"+bulkItemResponse.getOpType()+"失败"+reason);
return CommonResult.failed("es商品数据"+bulkItemResponse.getOpType()+"失败"+reason);
}
}
}
logger.info("ES商品数据全量初始化成功!");
return CommonResult.success();
数据同步
在搜索相关数据发生变化时,将变化的数据同步到es中。
数据同步有几种方式
- 第一种是一种最为简单的方式,在将数据写到mysql时,同时将数据写到ES,实现数据的双写。
缺点:硬编码:有需要写入mysql的地方都需要添加写入ES的代码;业务强耦合;存在双写失败丢数据风险;性能较差:本来mysql的性能就不是很高,再加写一个ES,系统的性能必然会下降。 - 第二种通过mq做数据同步,在做数据库数据修改时发送消息
通知变更字段。缺点存在延时 - 第三种可以利用mysql的binlog来进行同步
具体步骤如下:
1) 读取mysql的binlog日志,获取指定表的日志信息;
2) 将读取的信息转为MQ;
3) 编写一个MQ消费程序;
4) 不断消费MQ,每消费完一条消息,将消息写入到ES中。
以上三种同步方案第三种最好但是相对复杂,有经济实力可以考虑阿里云的DTS做数据同步。
我们这里使用第二种方案。
发送数据变更消息
//1. 实例化一个producer
DefaultMQProducer producer = new DefaultMQProducer(producerGroup,getAclRPCHook());
//2. 设置namesrvAddr,集群环境多个nameserver用;分割
producer.setNamesrvAddr(namesrvAddr);
//3. 启动
producer.start();
// 4. 发送消息
//构建实例,第一个参数为topic,第二个参数为tags,第三个参数为消息体
Message message = new Message(topic,dataSyncMessageDo.getDataTag(),(JSON.toJSONString(dataSyncMessageDo)).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
logger.info(result);
//关闭生产者
producer.shutdown();
if(SendStatus.SEND_OK == result.getSendStatus()){
return CommonResult.success();
}else {
logger.error("消息发送失败"+result.getSendStatus());
return CommonResult.failed("消息发送失败"+result.getSendStatus());
}
消息体dataSyncMessageD代码o如下
/***
* 变动索引
*/
private String syncIndex;
/***
* 变动数据主键
*/
private String syncDataId;
/**
*tag
*/
private String dataTag;
/***
* 变动数据 key为变更字段名 value为变更的值
*/
private Map<String,Object> syncData;
消费消息
扫描二维码关注公众号,回复:
9637363 查看本文章
// 实例化group .
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 设置namesrvAddr,集群环境多个nameserver用;分割
consumer.setNamesrvAddr(namesrvAddr);
// 设置监听的topic 监听全部为*
consumer.subscribe(topic, "*");
// 从brokers注册回调消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
boolean result = true;
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody());
String tags = msg.getProperties().get("TAGS");
//判断不同的tag判断数据是变更还是删除
if (tags.equals(dataSyncTag)) {
DataSyncMessageDo dataSyncMessageDo = JSON.parseObject(messageBody, DataSyncMessageDo.class);
CommonResult commonResult = elaticsearchManager.dataUpdate(dataSyncMessageDo);
logger.info("数据同步:"+commonResult);
if (!commonResult.isSuccess()){
logger.error("es数据更新失败,tags:"+dataSyncTag+",index:"+dataSyncMessageDo.getSyncIndex()+",id:"+dataSyncMessageDo.getSyncDataId());
result = false;
}
}
if (tags.equals(dataDelTag)) {
DataSyncMessageDo dataSyncMessageDo = JSON.parseObject(messageBody, DataSyncMessageDo.class);
logger.info("数据删除:"+dataSyncMessageDo);
CommonResult commonResult = elaticsearchManager.dataDelete(dataSyncMessageDo);
if (!commonResult.isSuccess()){
logger.error("es数据删除失败,tags:"+dataDelTag+",index:"+dataSyncMessageDo.getSyncIndex()+",id:"+dataSyncMessageDo.getSyncDataId());
result = false;
}
}
} catch (Exception e) {
e.printStackTrace();
// ②重试未超过三次就返回失败,超过3次 丢弃消息
if (msg.getReconsumeTimes() <= 3) {
result = false;
}
}
}
if(result){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
数据更新
/***
* 数据更新
* @param dataSyncMessageDo
* @return
*/
public CommonResult dataUpdate(DataSyncMessageDo dataSyncMessageDo){
RestHighLevelClient restHighLevelClient = createClient();
UpdateRequest request = new UpdateRequest(dataSyncMessageDo.getSyncIndex(), dataSyncMessageDo.getSyncDataId())
.doc(JSON.toJSONString(dataSyncMessageDo.getSyncData()), XContentType.JSON);
//更新数据不存在则插入新数据
request.upsert(JSON.toJSONString(dataSyncMessageDo.getSyncData()), XContentType.JSON);
request.setRefreshPolicy("true");
try {
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//处理成功分片数量少于总分片数量的情况
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
logger.error("es商品相似词数据更新失败"+reason);
return CommonResult.failed("es商品相似词数据更新失败"+reason);
}
}
restHighLevelClient.close();
} catch (IOException e) {
logger.error("数据更新异常:",e);
return CommonResult.failed("数据更新异常!");
}
return CommonResult.success();
}
数据删除
/***
* 数据删除
* @param dataSyncMessageDo
* @return
*/
public CommonResult dataDelete(DataSyncMessageDo dataSyncMessageDo){
RestHighLevelClient restHighLevelClient = createClient();
DeleteRequest request = new DeleteRequest(dataSyncMessageDo.getSyncIndex(),dataSyncMessageDo.getSyncDataId());
try {
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//处理成功分片数量少于总分片数量的情况
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
logger.error("es商品相似词数据更新失败"+reason);
return CommonResult.failed("es商品相似词数据更新失败"+reason);
}
}
restHighLevelClient.close();
} catch (IOException e) {
logger.error("数据更新异常:",e);
return CommonResult.failed("数据更新异常!");
}
return CommonResult.success();
}```
## 商品搜索
数据同步开发完成后 开始搜索部分的开发
```java
/***
* 商品搜索
* @param request
* @return
*/
public CommonResult<CommonPage<SearchProductDo>> productSearch(ProductSearchRequest request){
RestHighLevelClient restHighLevelClient = createClient();
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//组合查询
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
if(null != request.getWords() && request.getWords().size()>0) {
BoolQueryBuilder wordsQueryBuilder = new BoolQueryBuilder();
for (String word : request.getWords()) {
//全文匹配检索
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("keyword", "word");
//关闭分词后搜索词加*可以达到mysql模糊查询的效果 WildcardQueryBuilder模糊查询
//WildcardQueryBuilder matchQueryBuilder = new WildcardQueryBuilder("keyword", "*"+word+"*");
// .fuzziness(Fuzziness.AUTO)//对匹配查询启用模糊匹配
// .prefixLength(2)//在匹配查询中设置前缀长度选项 表明这个范围的字符需要精准匹配
// .maxExpansions(10); //设置最大扩展选项以控制查询的模糊过程
wordsQueryBuilder.should(matchQueryBuilder);
}
boolQueryBuilder.must(wordsQueryBuilder);
}
if(null != request.getLimitDate()) {
RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("publishTime");
//eq相等 ne、neq不相等, gt大于, lt小于 gte、ge大于等于 lte、le 小于等于 not非 mod求模
rangeQueryBuilder.lte(request.getLimitDate());
boolQueryBuilder.must(rangeQueryBuilder);
}
TermQueryBuilder publishStatus = new TermQueryBuilder("publishStatus",1);
//eq相等 ne、neq不相等, gt大于, lt小于 gte、ge大于等于 lte、le 小于等于 not非 mod求模
boolQueryBuilder.must(publishStatus);
BoolQueryBuilder stockStatusBuilder = new BoolQueryBuilder();
TermQueryBuilder stockStatus1 = new TermQueryBuilder("stockStatus",0);
stockStatusBuilder.should(stockStatus1);
TermQueryBuilder stockStatus2 = new TermQueryBuilder("stockStatus",2);
stockStatusBuilder.should(stockStatus2);
boolQueryBuilder.must(stockStatusBuilder);
searchSourceBuilder.query(boolQueryBuilder);
if(1 == request.getOrderByClause()){
searchSourceBuilder.sort(new FieldSortBuilder("publishTime").order(SortOrder.DESC));
}else if(2 == request.getOrderByClause()){
searchSourceBuilder.sort(new FieldSortBuilder("price").order(SortOrder.ASC));
searchSourceBuilder.sort(new FieldSortBuilder("sort").order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("publishTime").order(SortOrder.DESC));
}else if(3 == request.getOrderByClause()){
searchSourceBuilder.sort(new FieldSortBuilder("price").order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("sort").order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("publishTime").order(SortOrder.DESC));
}else{
searchSourceBuilder.sort(new FieldSortBuilder("sort").order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("sale").order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("publishTime").order(SortOrder.DESC));
}
//超时时间
searchSourceBuilder.timeout(new TimeValue(20, TimeUnit.SECONDS));
//开始
searchSourceBuilder.from((request.getPageNum()-1)*request.getPageSize());
//每页记录数
searchSourceBuilder.size(request.getPageSize());
searchRequest.indices(zeastProductIndex);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
RestStatus status = searchResponse.status();
if(status.getStatus() != 200){
logger.error("商品搜索失败!");
return CommonResult.failed("商品搜索失败!");
}
SearchHit[] searchHits = searchResponse.getHits().getHits();
long total = searchResponse.getHits().getTotalHits().value;
List<SearchProductDo> searchProductDos = new ArrayList<>();
for (SearchHit hit : searchHits) {
String sourceAsString = hit.getSourceAsString();
SearchProductDo searchProductDo = JSON.parseObject(sourceAsString, SearchProductDo.class);
searchProductDos.add(searchProductDo);
}
restHighLevelClient.close();
CommonPage commonPage = CommonPage.restPage(searchProductDos);
commonPage.setPageNum(request.getPageNum());
commonPage.setPageSize(request.getPageSize());
int totalPage = (int) (total%request.getPageSize() == 0?total/request.getPageSize():total/request.getPageSize()+1);
commonPage.setTotalPage(totalPage);
commonPage.setTotal(total);
return CommonResult.success(commonPage);
} catch (IOException e) {
logger.error("商品搜索异常:",e);
return CommonResult.failed("商品搜索异常!");
}
}
网上找到的各类型查询的图 做参考