Spring Data 框架集成
简介
Spring Data 是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。
- 其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce 框架和云计算数据服务。
- Spring Data 可以极大的简化
JPA(Elasticsearch)
的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。 - 除了 CRUD 外,还包括如分页、排序等一些
常用的功能。 - Spring Data 的官网:
https://spring.io/projects/spring-data
Spring Data 常用的功能模块如下:
Spring Data Elasticsearch 介绍
Spring Data Elasticsearch
基于spring data API
简化 Elasticsearch 操作,将原始操作 Elasticsearch 的客户端 API 进行封装 。Spring Data 为 Elasticsearch 项目提供集成搜索引擎。Spring Data Elasticsearch POJO
的关键功能区域为中心的模型与Elastichsearch
交互文档和轻松地编写一个存储索引库数据访问层。- 官方网站:
https://spring.io/projects/spring-data-elasticsearch
一、框架集环境成
-
1. 创建MAVEN 文件
-
2. 修改 pom 文件,增加依赖关系
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.6.RELEASE</version> <relativePath/> </parent> <groupId>com.atguigu.es</groupId> <artifactId>springdata-elasticsearch</artifactId> <version>1.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-test</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> </dependency> </dependencies> </project>
-
3. 增加配置文件
在 resources 目录中增加application.properties
文件# es 服务地址 elasticsearch.host=127.0.0.1 # es 服务端口 elasticsearch.port=9200 # 配置日志级别,开启 debug 日志 logging.level.com.atguigu.es=debug
-
4. 主程序
package com.atguigu.es; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringDataElasticSearchMainApplication { public static void main(String[] args) { SpringApplication.run(SpringDataElasticSearchMainApplication.class,args); } }
二、数据实体类
package com.atguigu.es;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Product {
private Long id;//商品唯一标识
private String title;//商品名称
private String category;//分类名称
private Double price;//商品价格
private String images;//图片地址
}
三. 创建模板类
ElasticsearchRestTemplate
是spring-data-elasticsearch
项目中的一个类,和其他 spring 项目中的 template类似。- 在新版的
spring-data-elasticsearch
中,ElasticsearchRestTemplate
代替了原来的ElasticsearchTemplate
。 - 原因是 ElasticsearchTemplate 基于 TransportClient,TransportClient 即将在 8.x 以后的版本中移除。所以,我们推荐使用 ElasticsearchRestTemplate。
- ElasticsearchRestTemplate 基 于 RestHighLevelClient 客户端的。需要自定义配置类,继承 AbstractElasticsearchConfiguration,并实现elasticsearchClient()抽象方法,创建 RestHighLevelClient 对象
package com.atguigu.es; import lombok.Data; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfigura tion; @ConfigurationProperties(prefix = "elasticsearch") @Configuration @Data public class ElasticsearchConfig extends AbstractElasticsearchConfiguration { private String host ; private Integer port ; //重写父类方法 @Override public RestHighLevelClient elasticsearchClient() { RestClientBuilder builder = RestClient.builder(new HttpHost(host, port)); RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); return restHighLevelClient; } }
四、DAO 数据访问对象
package com.atguigu.es;
import
org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductDao extends ElasticsearchRepository<Product,Long> {
}
五、实体类映射操作
package com.atguigu.es;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "shopping", shards = 3, replicas = 1)
public class Product {
//必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
@Id
private Long id;//商品唯一标识
/**
* type : 字段数据类型
* analyzer : 分词器类型
* index : 是否索引(默认:true)
* Keyword : 短语,不进行分词
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String title;//商品名称
//keyword类型 用于结构化的字段, 只能通过精确值搜索到。
@Field(type = FieldType.Keyword)
private String category;//分类名称
@Field(type = FieldType.Double)
private Double price;//商品价格
@Field(type = FieldType.Keyword, index = false)
private String images;//图片地址
}
六、索引阶段
package com.atguigu.es;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESIndexTest {
//注入 ElasticsearchRestTemplate
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
//创建索引并增加映射配置
@Test
public void createIndex(){
//创建索引,系统初始化会自动创建索引
System.out.println("创建索引");
}
@Test
public void deleteIndex(){
//创建索引,系统初始化会自动创建索引
boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
System.out.println("删除索引 = " + flg);
}
}
七、文档操作
package com.atguigu.es;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESProductDaoTest {
@Autowired
private ProductDao productDao;
/**
* 新增
*/
@Test
public void save(){
Product product = new Product();
product.setId(2L);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("http://www.atguigu/hw.jpg");
productDao.save(product);
}
//修改
@Test
public void update(){
Product product = new Product();
product.setId(1L);
product.setTitle("小米 2 手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("http://www.atguigu/xm.jpg");
productDao.save(product);
}
//根据 id 查询
@Test
public void findById(){
Product product = productDao.findById(1L).get();
System.out.println(product);
}
//查询所有
@Test
public void findAll(){
Iterable<Product> products = productDao.findAll();
for (Product product : products) {
System.out.println(product);
}
}
//删除
@Test
public void delete(){
Product product = new Product();
product.setId(1L);
productDao.delete(product);
}
//批量新增
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.atguigu/xm.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}
//分页查询
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage=0;//当前页,第一页从 0 开始,1 表示第二页
int pageSize = 5;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productDao.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
}
八. 文档搜索
package com.atguigu.es;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESSearchTest {
@Autowired
private ProductDao productDao;
/**
* term 查询
* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
*/
@Test
public void termQuery(){
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder);
for (Product product : products) {
System.out.println(product);
}
}
/**
* term 查询加分页
*/
@Test
public void termQueryByPage(){
int currentPage= 0 ;
int pageSize = 5;
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);
for (Product product : products) {
System.out.println(product);
}
}
}
ES 的优化
硬件选择
Elasticsearch 的基础是 Lucene
,所有的索引和文档数据是存储在本地的磁盘中,具体的路径可在 ES 的配置文件../config/elasticsearch.yml
中配置,如下:
#----------------------------------- Paths------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
磁盘在现代服务器上通常都是瓶颈。Elasticsearch 重度使用磁盘,你的磁盘能处理的吞吐量越大,你的节点就越稳定。
这里有一些优化磁盘 I/O 的技巧:
- 使用 SSD。就像其他地方提过的, 他们比机械磁盘优秀多了。
- 使用 RAID 0。条带化 RAID 会提高磁盘 I/O,代价显然就是当一块硬盘故障时整个就故障了。不要使用镜像或者奇偶校验 RAID 因为副本已经提供了这个功能。
- 另外,使用多块硬盘,并允许 Elasticsearch 通过多个
path.data
目录配置把数据条带化分配到它们上面。 - 不要使用远程挂载的存储,比如 NFS 或者 SMB/CIFS。这个引入的延迟对性能来说完全是背道而驰的。
分片策略
1、合理设置分片数
分片和副本的设计为 ES 提供了支持分布式和故障转移的特性,但并不意味着分片和副本是可以无限分配的。而且索引的分片完成分配后由于索引的路由机制,我们是不能重新修改分片数的。
- 可能有人会说,我不知道这个索引将来会变得多大,并且过后我也不能更改索引的大小,所以为了保险起见,还是给它设为 1000 个分片吧。但是需要知道的是,一个分片并不是没有代价的。
- 需要了解:
- 一个分片的底层即为一个
Lucene 索引
,会消耗一定文件句柄、内存、以及 CPU 运转。 - 每一个搜索请求都需要命中索引中的每一个分片,如果每一个分片都处于不同的节点还好, 但如果多个分片都需要在同一个节点上竞争使用相同的资源就有些糟糕了。
- 用于计算相关度的词项统计信息是基于分片的。如果有许多分片,每一个都只有很少的数据会导致很低的相关度。
- 一个分片的底层即为一个
一般来说,我们遵循一些原则:
- 控制每个分片占用的硬盘容量不超过 ES 的最大 JVM 的堆空间设置(一般设置不超过 32G,参考下文的 JVM 设置原则),因此,如果索引的总容量在 500G 左右,那分片大小在 16 个左右即可;当然,最好同时考虑原则 2。
- 考虑一下 node 数量,一般一个节点有时候就是一台物理机,如果分片数过多,大大超过了节点数,很可能会导致一个节点上存在多个分片,一旦该节点故障,即使保持了 1 个以上的副本,同样有可能会导致数据丢失,集群无法恢复。所以, 一般都设置分片数不超过节点数的 3 倍。
- 主分片,副本和节点最大数之间数量,我们分配的时候可以参考以下关系:
节 点 数 < = 主 分 片 数 ∗ ( 副 本 数 + 1 ) 节点数<=主分片数*(副本数+1) 节点数<=主分片数∗(副本数+1)
2、推迟分片分配
对于节点瞬时中断的问题,默认情况,集群会等待一分钟来查看节点是否会重新加入,
- 如果这个节点在此期间重新加入,重新加入的节点会保持其现有的分片数据,不会触发新的分片分配。这样就可以减少 ES 在自动再平衡可用分片时所带来的极大开销。
- 通过修改参数
delayed_timeout
,可以延长再均衡的时间,可以全局设置也可以在索引级别进行修改://全局修改:PUT /_all/_settings { "settings": { "index.unassigned.node_left.delayed_timeout": "5m" } }
3、路由选择
当我们查询文档的时候,Elasticsearch
如何知道一个文档应该存放到哪个分片中呢?它其实是通过下面这个公式来计算出来:
s h a r d = h a s h ( r o u t i n g ) % n u m b e r _ o f _ p r i m a r y _ s h a r d s shard = hash(routing) \quad \% \quad number\_of\_primary\_shards shard=hash(routing)%number_of_primary_shards
routing 默认值是文档的 id,也可以采用自定义值,比如用户 id。
在实际创建时,我们可以选择带 routing 或者不带
- 不带 routing 查询
在查询的时候因为不知道要查询的数据具体在哪个分片上,所以整个过程分为 2 个步骤- 分发:请求到达协调节点后,协调节点将查询请求分发到每个分片上。
- 聚合: 协调节点搜集到每个分片上查询结果,在将查询的结果进行排序,之后给用户返回结果。
- 带 routing 查询
- 查询的时候,可以直接根据 routing 信息定位到某个分配查询,不需要查询所有的分配,经过协调节点排序。
- 向上面自定义的用户查询,如果 routing 设置为
userid
的话,就可以直接查询出数据来,效率提升很多。
4. 写入速度优化
ES 的默认配置,是综合了数据可靠性、写入速度、搜索实时性等因素。实际使用时,我们需要根据公司要求,进行偏向性的优化。
针对于搜索性能要求不高,但是对写入要求较高的场景,我们需要尽可能的选择恰当写优化策略。
综合来说,可以考虑以下几个方面来提升写索引的性能:
- 加大
Translog Flush
,目的是降低Iops、Writeblock
。 - 增加
Index Refresh
间隔,目的是减少Segment Merge
的次数。 - 调整
Bulk
线程池和队列。 - 优化节点间的任务分布。
- 优化 Lucene 层的索引建立,目的是降低 CPU 及 IO。
5、批量数据提交
ES 提供了 Bulk API 支持批量操作,当我们有大量的写任务时,可以使用 Bulk 来进
- 行批量写入。
通用的策略如下:Bulk 默认设置批量提交的数据量不能超过 100M。数据条数一般是根据文档的大小和服务器性能而定的,但是单次批处理的数据大小应从 5MB~15MB 逐渐增加,当性能没有提升时,把这个数据量作为最大值。通过滑动窗口确定最佳的单次批处理大小
6、 优化存储设备
- ES 是一种密集使用磁盘的应用,在段合并的时候会频繁操作磁盘,所以对磁盘要求较高,当磁盘速度提升之后,集群的整体性能会大幅度提高。
7、合理使用合并
Lucene
以段的形式存储数据。当有新的数据写入索引时,Lucene 就会自动创建一个新的段。
- 随着数据量的变化,段的数量会越来越多,消耗的多文件句柄数及 CPU 就越多,查询效率就会下降。才是就会进行 段合并。
- 由于 Lucene 段合并的计算量庞大,会消耗大量的 I/O,所以 ES 默认采用较保守的策略,让后台定期进行段合并
8、减少 Refresh 的次数
Lucene
在新增数据时,采用了延迟写入的策略,默认情况下索引的refresh_interval
为 1 秒。Lucene
将待写入的数据先写到内存中,超过 1 秒(默认)时就会触发一次Refresh
, 然后 Refresh 会把内存中的的数据刷新到操作系统的文件缓存系统中。- 如果我们对搜索的实效性要求不高,可以将 Refresh 周期延长,例如 30 秒。
- 这样还可以有效地减少段刷新次数,但这同时意味着需要消耗更多的 Heap 内存。
9、加大 Flush 设置
Flush 的主要目的是把文件缓存系统中的段持久化到硬盘,当 Translog 的数据量达到 512MB 或者 30 分钟时,会触发一次 Flush。
index.translog.flush
_threshold_size 参数的默认值是 512MB,我们进行修改。- 增加参数值意味着文件缓存系统中可能需要存储更多的数据,所以我们需要为操作系统的文件缓存系统留下足够的空间。
10、减少副本的数量
ES 为了保证集群的可用性,提供了 Replicas(副本)支持,然而每个副本也会执行分析、索引及可能的合并过程,所以 Replicas 的数量会严重影响写索引的效率。
- 当写索引时,需要把写入的数据都同步到副本节点,副本节点越多,写索引的效率就越慢。
- 如果我们需要大批量进行写入操作, 可以先禁止 Replica 复制 , 设 置
index.number_of_replicas: 0
关闭副本。在写入完成后,Replica 修改回正常的状态。
11、 内存设置
ES 默认安装后设置的内存是 1GB,对于任何一个现实业务来说,这个设置都太小了。如果是通过解压安装的 ES,则在 ES 安装文件中包含一个 jvm.option
文件,添加如下命令来设置 ES 的堆大小,Xms 表示堆的初始大小,Xmx 表示可分配的最大内存,都是 1GB。
- 确保 Xmx 和 Xms 的大小是相同的,其目的是为了能够在 Java 垃圾回收机制清理完堆区后不需要重新分隔计算堆区的大小而浪费资源,可以减轻伸缩堆大小带来的压力。
假设你有一个 64G 内存的机器,按照正常思维思考,你可能会认为把 64G 内存都给 ES 比较好,但现实是这样吗, 越大越好?虽然内存对 ES 来说是非常重要的,但是答案是否定的!
因为 ES 堆内存的分配需要满足以下两个原则:
-
不要超过物理内存的 50%:Lucene 的设计目的是把底层 OS 里的数据缓存到内存中。 Lucene 的段是分别存储到单个文件中的,这些文件都是不会变化的,所以很利于缓存,同时操作系统也会把这些段文件缓存起来,以便更快的访问。
如果我们设置的堆内存过大,Lucene 可用的内存将会减少,就会严重影响降低 Lucene 的全文本查询性能。 -
堆内存的大小最好不要超过 32GB:在 Java 中,所有对象都分配在堆上,然后有一个
Class Pointer
指针指向它的类元数据。 这个 class Pointer指针在32位系统中位32位。因为为了保证正确的索引,堆的大小尽量不要超过 32 G。
假设你有个机器有 128 GB 的内存,你可以创建两个节点,每个节点内存分配不超过 32 GB。 也就是说不超过 64 GB 内存给 ES 的堆内存,剩下的超过 64 GB 的内存给 Lucene。
其他配置参数列表
参数名 | 参数值 | 说明 |
---|---|---|
cluster.name | elasticsearch | 配置 ES 的集群名称,默认值是 ES,建议改成与所存数据相关的名称,ES 会自动发现在同一网段下的集群名称相同的节点 |
node.name | node-1 | 集群中的节点名,在同一个集群中不能重复。节点的名称一旦设置,就不能再改变了。当然,也可以设 置 成 服 务 器 的 主 机 名 称 , 例 如 node.name:${HOSTNAME}。 |
node.master | true | 指定该节点是否有资格被选举成为 Master 节点,默认是 True,如果被设置为 True,则只是有资格成为 Master 节点,具体能否成为 Master 节点,需要通 过选举产生。 |
node.data | true | 指定该节点是否存储索引数据,默认为 True。数据的增、删、改、查都是在 Data 节点完成的。 |
index.number_of_shards | 1 | 设置都索引分片个数,默认是 1 片。也可以在创建索引时设置该值,具体设置为多大都值要根据数据量的大小来定。如果数据量不大,则设置成 1 时效率最高 |
index.number_of_replicas | 1 | 设置默认的索引副本个数,默认为 1 个。副本数越多,集群的可用性越好,但是写索引时需要同步的数据越多。 |
transport.tcp.compress | true | 设置在节点间传输数据时是否压缩,默认为 False,不压缩 |
discovery.zen.minimum_master_nodes | 1 | 设置在选举 Master 节点时需要参与的最少的候选主节点数,默认为 1。如果使用默认值,则当网络不稳定时有可能会出现脑裂。合理的数值为 (master_eligible_nodes/2)+1 ,其 master_eligible_nodes 表示集群中的候选主节点数 |
discovery.zen.ping.timeout | 3s | 设置在集群中自动发现其他节点时 Ping 连接的超时时间,默认为 3 秒。在较差的网络环境下需要设置得大一点,防止因误判该节点的存活状态而导致分片的转移 |