目录
- 分布式搜索-Elasticsearch
- 一、Elasticsearch-核心术语
- 二、Elasticsearch-安装(Centos7、java环境)
- 1、下载地址:[elasticsearch下载地址](https://www.elastic.co/cn/downloads/past-releases#elasticsearch)
- 2、解压压缩包并创建目录
- 3、修改配置文件
- 4、启动es
- 4.3、再次启动es
- 三、基于索引的基本操作
- 四、基于文档的基本操作
- 五、文档的乐观锁
- 六、分词器
- 七、DSL搜索详解
- 7.1、查询
- 7.2、判断某个字段是否存在
- 7.3、查询所有的文档
- 7.4、分页查找
- 7.5、term精确搜索
- 7.6、terms多个词语匹配检索
- 7.7、match分词搜索
- 7.8、match_phrase短语匹配
- 7.9、主键ids搜索
- 7.10、multi_match(多字段查询)
- 7.11、布尔查询
- 7.12、过滤器
- 7.13、排序
- 7.14、前缀查询(prefix)
- 7.15、模糊查询(fuzzy)
- 7.16、占位符查询(wildcard)
- 7.17、高亮查询
- 八、深度分页
- 九、批量操作
- 十、Elasticsearch集群
- 十一、Elasticsearch整合SpringBoot
- 十二、SpringBoot操作Elasticsearch
- 12.1、创建索引
- 12.2、删除索引
- 12.3、新增索引和文档数据
- 12.4、修改文档数据
- 12.5、查询文档数据
- 12.6、删除文档数据
- 12.7、分页查询
- 12.8、高亮查询
- 12.9、排序查询
- 十三、Logstash
分布式搜索-Elasticsearch
- Elasticsearch是基于Lucene的,想对于Solr更快,支持TB基本的大数据查询,实时性更高
- es官网:elasticsearch官网
一、Elasticsearch-核心术语
1、核心概念
术语名 | 英文名 | 解释 |
---|---|---|
搜索引擎 | elasticsearch | 类似于数据库 |
索引 | index | 类似于数据库表 |
类型 | type | 相当于表的逻辑类型(es7.0以后已经不再使用,默认为_doc) |
文档 | document | 相当于行,记录 |
字段 | fields | 相当于列,字段 |
2、集群相关
- 分片(shard):把索引库拆分为多份,分别放在不同的节点上,比如有3个节点,3个节点的所有数据内容加在一起是一个完整的索引库。分别保存到三个节点,水平扩展,提高吞吐量。
- 备份(replica):每个shard的备份
3、简称
- shard = primary shard(主分片)
- replica = replica shard(备份节点)
二、Elasticsearch-安装(Centos7、java环境)
1、下载地址:elasticsearch下载地址
2、解压压缩包并创建目录
# 解压到/usr/local/目录下
tar -zxvf elasticsearch-7.4.2-linux-x86_64.tar.gz -C /usr/local/
# 进入解压的目录
cd /usr/local/elasticsearch-7.4.2
# 添加两个文件夹 用于以后做数据和日志存放目录
mkdir data
mkdir logs
es目录介绍
- bin:可执行文件在里面,运行es的命令就在这里面,包含了一些脚本文件等
- config:配置文件目录
- JDK:java环境
- lib:依赖的jar,类库
- logs:日志文件
- modules:es相关的模块
- plugins:可以自已开发的插件
- data:这个目录没有,自己新建一下,后面要用 ->
mkdir data
,这个作为索引目录
3、修改配置文件
3.1、修改es核心配置文件
- 配置文件在es根目录下config文件夹里的elasticsearch.yml
# 打开核心配置文件
vim elasticsearch.yml
# 集群名称,单节点也是可以定义
cluster.name: my-application
# 节点名称
node.name: es-node0
# 数据存放路径
path.data: /usr/local/elasticsearch-7.0.0/data
# 日志存放路径
path.logs: /usr/local/elasticsearch-7.0.0/logs
# 绑定可以操作的地址 0.0.0.0表示所有的ip地址都可以访问操作
network.host: 0.0.0.0
# 用于访问的端口
http.port: 9200
# 添加跨域配置
http.cors.enabled: true
# 可以跨域的地址
http.cors.allow-origin: "*"
# 设置一系列符合主节点条件的节点的主机名或 IP 地址来引导启动集群
# 这里的es-node0是在上面配置过了
cluster.initial_master_nodes: ["es-node0"]
3.2、修改JVM配置文件
- JVM配置是es根目录下config文件夹里的jvm.options
# 打开核心配置文件
vim jvm.options
# 虚拟机或服务器配置不是很好时把jvm的初始化和最大堆空间修改小一点
# 生成环境可以根据机器配置适量增大或减少
-Xms128m
-Xmx128m
4、启动es
4.1、添加系统用户并授权
- es默认是禁止root用户启动的,所以需要添加一个用户
# 添加一个名叫zzm系统用户
useradd zzm
# 进行授权
chown -R zzm:zzm /usr/local/elasticsearch-7.0.0
# 切换到zzm用户
su zzm
4.2、启动es
# 进入bin目录
cd bin
# 启动es
./elasticsearch
启动时如果发生如下错误:
ERROR: [3]bootstrap checks failed
[1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65535]
[2]: max number of threds [3756] for user [esuser] is too low, increase to at least[4096]
[3]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
可能的原因:
- 第一个问题是:es能打开的文件数量太少了 要提升
- 第二个问题是:zzm能使用的最大线程数太少了,也需要提升
- 第三个问题是:虚拟内存的最大值太低了
- 这些问题主要是系统配置上的问题,需要修改系统配置文件
4.2.1、修改配置文件
# 切换到root用户
su root
# 修改系统安全限制配置文件
vim /etc/security/limits.conf
- 拉到文件最下方,添加如下内容:
# soft代表警告的设定,可以超过这个设定值,但是超过后会有警告
# hard代表严格的设定,不允许超过这个设定的值
# nofile是每个进程可以打开的文件数的限制
# nproc是操作系统级别对每个用户创建的进程数的限制
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
- 保存并退出,再修改系统控制配置
# 修改系统控制配置
vim /etc/sysctl.conf
- 在文件最下方添加配置
vm.max_map_count=262145
- 保存并退出,然后刷新配置文件
sysctl -p
4.3、再次启动es
- 切换回zzm用户
su zzm
- 切换到es的bin目录并启动
# 前台启动的方式
./elasticsearch
# 后台启动的方式
./elasticsearch -d
端口号意义
- 9200:Http协议,用于外部通讯
- 9300:Tcp协议,ES集群之间是通过9300通讯
三、基于索引的基本操作
主要类型
- 字符串类型:string,text,keyword
- 整数类型:integer,long,short,byte
- 浮点类型:double,float,half_float,scaled_float
- 逻辑类型:boolean
- 日期类型:date
- 范围类型:range
- 二进制类型:binary
- 数组类型:array
- 对象类型:object
- 嵌套类型:nested
- 地理坐标类型:geo_point
- 地理地图:geo_shape
- IP类型:ip
- 范围类型:completion
- 令牌计数类型:token_count
3.1、索引的一些基本操作
- 查看集群健康:
GET http://localhost:9200/_cluster/health
- 创建索引:
PUT http://localhost:9200/index_test
- 查看索引:
GET http://localhost:9200/_cat/indices?v
- 删除索引:
DELETE http://localhost:9200/index_test
3.2、索引的mappings映射
- 索引分词概念:index默认为true,设置为false的话,name这个字段就不会被索引
3.2.1、创建索引的同时创建mappings
PUT /index_str
{
"mappings": {
"properties": {
"name": {
"type": "text",
"index": true
},
"username": {
"type": "text",
"index": false
}
}
}
}
3.2.2、查看分词效果
GET /index_mapping/_analyze
{
"field": "name",
"text": "zzm is good"
}
3.2.3、尝试修改
POST /index_str/mapping
{
"properties": {
"name": {
"type": "long"
}
}
}
3.2.3、为已存在的索引创建或创建mappings
POST /index_str/_mapping
{
"properties": {
"id": {
"type": "long"
},
"age": {
"type": "integer"
}
}
}
四、基于文档的基本操作
4.1、文档的添加
POST /my_doc/_doc/1
{
"id": 1001,
"name": "张三",
"age": 23,
"create_time": "2019-12-24"
}
/my_doc/_doc/1 -> /{索引名}/_doc/{索引ID} (是指索引在es中的id,而不是这条记录的id,比如记录的id从数据库来)
4.2、文档的删除
DELETE /my_doc/_doc/1
文档删除不是立即删除,文档还是保存在磁盘上,索引增长越来越多,才会把那些曾经标识过删除的,进行清理,从磁盘上移除
4.3、文档的修改
- 局部
POST /my_doc/_doc/1/_update
{
"doc": {
"name": "张三"
}
}
- 全量替换
PUT /my_doc/_doc/1
{
"id": 1001,
"name": "李四",
"age": 18,
"create_time": "2020-04-24"
}
每次修改后,version会改变
4.4、文档的查询
4.4.1、常规查询
GET /index_demo/_doc/1
GET /index_demo/_doc/_search
查询结果
{
"_index": "my_doc",
"_type": "_doc",
"_id": "2",
"_score": 1.0,
"_version": 9,
"_source": {
"id" 1002,
"name": "zhangsan",
.....
}
}
元数据
- _index:文档数据所属哪个索引,理解为数据库的某张表即可
- _type:文档数据属于哪个类型,新版本使用_doc
- _id:文档数据的唯一表示,类似于数据库中某张表的主键,可以自动生成或者手动指定
- _score:查询相关度,是否契合用户匹配,分数越高用户的体验越高
- _version:版本号
- _source:文档数据,json格式
4.4.2、定制结果集
GET /index_demo/_doc/1?_source=id,name
GET /index_demo/_doc/_search=id,name
4.4.3、判断文档是否存在
HEAD /index_demo/_doc/1
五、文档的乐观锁
当查询、插入、更新一条数据的时候都能看到有一个_version属性,这个属性标识当前数据的版本号,老版本中通过这个属性实现乐观锁,只需要在请求最后加上版本号就可以了
-
老版本
使用post请求访问 127.0.0.1:9200/索引名/_doc/id/_update?version={数值} -
新版本
使用post请求访问 127.0.0.1:9200/索引名/_doc/id/_update?if_seq_no={数值}&if_primary_term={数值}
_seq_no:文档版本号,作用同_version(文档所有位置内部的编号,效率更加高效,管理起来更方便)
_primary_term:文档所在位置
六、分词器
6.1、什么是分词器
- 把文本转换为一个个的单词,分词称之为analysis。es默认只对英文语句做分词,中文不支持,每个中文字都会被拆分为独立的个体。
6.2、es内置分词器
- standard:默认分词,单词会被拆分,大小会转换为小写。
- simple:按照非字母分词。大写转为小写。
- whitespace:按照空格分词。忽略大小写。
- stop:去除无意义单词,比如the/a/an/is…
- keyword:不做分词。把整个文本作为一个单独的关键词。
6.3、ik中文分词器
下载地址:ik中文分词器
安装:
- 1.从github上下载压缩包
- 2.进入es根目录下的plugins文件夹
- 3.创建名为ik的文件夹
- 4.将压缩包里的内容解压到ik文件夹下
- 5.重启es
6.4、ik中的两种分词器
-
ik_max_word:会将文本做最细粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,中华人民,中华,华人,人民共和国,人民,人,民,共和国,共和,和,国国,国歌”,会穷尽各种可能的组合,适合 Term Query;
-
ik_smart:会做最粗粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,国歌”,适合 Phrase 查询。
6.4、自定义中文分词库
中文词汇复杂繁多,ik分词器也不能保证所有情况下都能有效的分词,例如:“骚年在认真的学习”,就会被拆分成“骚,年在,认真的,学习”,很明显“年在”并不是一个词,同样的一些专属名词也不能很有效的分词,比如:“印象笔记”,“掘金网“等等,这时候我们就需要为分词器,添加自定义词汇。
6.4.1、修改ik分词器配置文件、添加自定义词典
- 进入es根目录,修改ik分词器的词典配置
vim plugins/ik/config/IKAnalyzer.cfg.xml
- 在ext_dict属性中添加自定义的词典,需要添加多个字典用分号隔开“;”
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict">custom.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!--<entry key="remote_ext_dict">location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!--<entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry> -->
</properties>
6.4.2、在这个目录下新建一个文件名叫custom.dic,并添加如下内容
# 下列内容,可根据自身需求而添加
慕课网
有道词典
百度网盘
记得重启es
七、DSL搜索详解
7.1、查询
POST /shop/_doc/_search
{
"query": {
"match": {
<!-- 查询desc属性里包含掘金网的数据 -->
"desc": "掘金网"
}
}
}
7.2、判断某个字段是否存在
POST /shop/_doc/_search
{
"query": {
"exists": {
"field": "desc"
}
}
}
7.3、查询所有的文档
GET /shop/_doc/_search
POST /shop/_doc/_search
{
"query": {
# 查询所有
"match_all": {}
} ,
# 设定要查询的属性,注意这里使用的是中括号
"_source": ["id","name","age"]
}
7.4、分页查找
POST /shop/_doc/_search
{
"query": {
# 查询所有
"match_all": {}
},
# 从第10条开始
"from":10,
# 查询的条数
"size":10
}
7.5、term精确搜索
- 将搜索的词作为一个关键词搜索,而不会对其进行分词后再搜索
POST /shop/_doc/_search
{
"query": {
"term": {
"desc": "中国大学"
}
}
}
7.6、terms多个词语匹配检索
POST /shop/_doc/_search
{
"query": {
"term": {
"desc": ["中国大学","万门大学"]
}
}
}
7.7、match分词搜索
- 会对搜索的词进行分词,再查询,term则不会
POST /shop/_doc/_search
{
"query": {
"match": {
"desc": "中国大学",
# 设置or或者and
"operator": "or",
"minimum_should_match": "60*"
}
}
}
- operater:为or的时候(默认为or),只要命中一个拆分的词汇时就可以当做匹配,设置为and的时候,必须所有的分词都匹配到。
- minimum_should_match: 最低匹配精度,至少有[分词后的词语个数]x百分百,得出一个数据值取整。
举个例子
:当前属性设置为70,若一个用户查询检索内容分词后有10个词语,那么匹配度按照 10x70%=7,则desc中至少需要有7个词语匹配,就展示;若分词后有8个,则 8x70%=5.6,则desc中至少需要有5个词语匹配,就展示。同时也支持设置具体的数字,表示匹配单词的个数
7.8、match_phrase短语匹配
- match:分词后只要有匹配就返回
- match_phrase:分词结果必须在text字段分词中都包含,而且顺序必须相同,而且必须都是连续的。(搜索比较严格)、
POST /shop/_doc/_search
{
"query": {
"match_phrase": {
"desc": {
"query": "你好 再见",
"slop": 2
}
}
}
}
slop:允许词语间跳过的数量
7.9、主键ids搜索
POST /shop/_doc/_search
{
"query": {
"ids": {
"type": "_doc",
"values": ["1001", "1012", "1013"]
}
}
}
7.10、multi_match(多字段查询)
- 满足使用match在多个字段中进行查询的需求
POST /shop/_doc/_search
{
"query": {
"multi_match": {
"query": "大学",
# 在desc和name属性中匹配“大学”
"fields": ["desc", "name"]
}
}
}
- boots:权重,为某个字段设置权重,权重越高,文档相关性得分就越高,通常来说搜索商品名称要比商品简介的权重更高。
POST /shop/_doc/_search
{
"query": {
"multi_match": {
"query": "大学",
# 在desc和name属性中匹配“大学”
"fields": ["desc", "name^10"]
}
}
}
7.11、布尔查询
- must:查询必须匹配搜索条件,例如and
- should:查询匹配满足1个以上条件,例如or
- must_not:不匹配搜索条件,一个都不满足
POST /shop/_doc/_search
{
"query": {
"bool": {
# 这里是多重查询的判断条件,可以使用
# must:查询必须匹配搜索条件,譬如 and
# should:查询匹配满足1个以上条件,譬如 or
# must_not:不匹配搜索条件,一个都不要满足
"must": [
{
"multi_match": {
"query": "大学",
"fields": ["desc", "name"]
}
}, {
"term": {
"sex": 1
# 添加权重
"boost": 18
}
}, {
"term": {
"birthday": "1999-01-14"
}
}
]
# 这里继续添加不同的多重查询判断条件也是可以的
}
}
}
7.12、过滤器
对搜索出来的结果进行数据过滤。不会到es库里去搜索,不会去计算文档的相关度分数,所以过滤的性能会比较高,过滤器可以和全文搜索结合在一起使用。post_filter元素是一个顶层元素,只会对搜索结果进行过滤。不会计算数据的匹配相关性分数,不会根据分数去排序,query则相反,会计算分数。
- query:根据用户搜索条件检索匹配记录
- post_filter:用于查询后,对结果数据的筛选
- gte:大于等于
- lte:小于等于
- gt:大于
- lt:小于
POST /shop/_doc/_search
{
"query": {
"match": {
# 查询desc属性里包含掘大学的数据
"desc": "大学"
}
},
# 过滤器
"post_filter": {
"range": {
# 过滤的字段
"money": {
# 取大于60并且小于1000的数据
"gt": 60,
"lt": 1000
}
}
}
}
7.13、排序
- es的排序同sql,可以desc也可以asc,也支持组合排序。
POST /shop/_doc/_search
{
"query": {
"match": {
# 查询desc属性里包含大学的数据
"desc": "大学"
}
},
# 排序 test类型的属性是没法排序的
"sort": [
{
# desc降序排序,asc升序排序
"age": "desc"
}, {
"money": "desc"
}
]
}
- 由于文本会被分词,所以往往要去做排序会报错,通常我们可以为这个字段增加额外的一个附属属性,类型为keyword,用于做排序。
POST /shop2/_mapping
{
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
7.14、前缀查询(prefix)
POST /shop/_doc/_search
{
"query": {
# 查询name属性以大学开始的数据
"prefix": {
"name": "大学"
}
}
}
7.15、模糊查询(fuzzy)
POST /shop/_doc/_search
{
"query": {
# 模糊查询
"fuzzy": {
"desc": "大学"
}
}
}
- 多字段查询
POST /shop/_doc/_search
{
"query": {
# 模糊查询
"multi_match": {
"fields": [ "desc", "name"],
"query": "大学",
"fuzziness": "AUTO"
}
}
}
7.16、占位符查询(wildcard)
- *表示可以有多个字符
- ? 表示只有一个字符
POST /shop/_doc/_search
{
"query": {
"wildcard": {
"name": "*大学?"
}
}
}
7.17、高亮查询
POST /shop/_doc/_search
{
"query": {
"match": {
# 查询desc属性里包含大学的数据
"desc": "大学"
}
},
# 高亮关键词
"highlight": {
# 在关键词前面 添加span标签,默认为em标签
"pre_tags": ["<span>"],
# 在关键词后面 添加span标签,默认为em标签
"post_tags": ["</span>"],
<# 需要高亮的属性
"fields": {
"desc": {}
}
}
}
八、深度分页
8.1分页查询
POST /shop/_doc/_search
{
"query": {
"match_all": {},
"from": 0,
"size": 10
}
}
8.2、深度分页
- 深度分页其实就是搜索的深浅度,比如第1页,第2页,第10页,第20页,是比较浅的,第10000页,第20000页也就是很深了。
POST /shop/_doc/_search
{
"query": {
"match_all": {},
"from": 9999,
"size": 10
}
}
8.3、提升搜索量
- 通过设置
index.max_result_window
来突破10000数据
GET /shop/_settings
PUT /shop/_settings
{
"index.max_result_window": "20000"
}
8.4、scroll滚动搜索
滚动搜索可以先查询出一些数据,然后再紧接着依次往下查询。在第一次查询的时候会有一个滚动id,相当于一个锚标记,随后再次滚动搜索会需要上一次搜索的锚标记
,根据这个进行下一次的搜索请求。每次搜索都是基于一个历史的数据快照,查询数据的期间,如果有数据变更,那么和搜索是没有关系的,搜索的内容还是快照中的数据。
- scroll=1m,相当于是一个session会话时间,搜索保持的上下文时间为1分钟。
8.4.1、发起滚动查询
POST /shop/_doc/_search?scroll=1m
{
"query": {
"match_all": { }
},
"sort" : ["_doc"],
# 每次滚动查询出来的次数
"size": 1000
}
8.4.2、进行后续的滚动查询
POST /_search/scroll
{
"scroll": "1m",
"scroll_id": "your last scroll_id"
}
九、批量操作
9.1、批量查询(_mget)
POST /shop/_doc/_mget
{
# 查询的字段
"ids": [
"1001",
"1002",
"1003"
]
}
9.2、批量操作(bulk)
9.2.1、基本语法
- bulk操作和以往的普通请求格式有区别。
不要格式化json,bulk的语法是一行算作一个命令,同时也不是完全正确的json格式,个别的工具可能有json校验,会报错,这个需要注意
{ action: { metadata }}
{ request body }
{ action: { metadata }}
{ request body }
- { action: { metadata }}代表批量操作的类型,可以是新增、删除或修改
- \n是每行结尾必须填写的一个规范,每一行包括最后一行都要写,用于es的解析
- { request body }是请求body,增加和修改操作需要,删除操作则不需要
9.2.2、批量操作的类型
action 必须是以下选项之一
- create:如果文档不存在,那么就创建它。存在会报错。发生异常报错不会影响其他操作。
- index:创建一个新文档或者替换一个现有的文档。
- update:部分更新一个文档。
- delete:删除一个文档。
metadata 中需要指定要操作的文档的_index 、 _type 和 _id,_index 、 _type也可以在url中指定
9.2.3、批量新增
- create批量新增:如果文档不存在,那么就创建它。存在会报错。发生异常报错不会影响其他操作
- 不同索引的批量新增:向shop1、shop2、shop3三个索引,分别增加1条数据
使用post请求访问 ip:9200/_bulk
POST /_bulk
{"create": {"_index": "shop1", "_type": "_doc", "_id": "2001"}}
{"id": "2001", "name": "name2001"}
{"create": {"_index": "shop2", "_type": "_doc", "_id": "2002"}}
{"id": "2002", "name": "name2002"}
{"create": {"_index": "shop3", "_type": "_doc", "_id": "2003"}}
{"id": "2003", "name": "name2003"}
- 相同索引的批量新增
POST /shop/_doc/_bulk
{"create": {"_id": "2001"}}
{"id": "2001", "name": "name2001"}
{"create": {"_id": "2002"}}
{"id": "2002", "name": "name2002"}
{"create": {"_id": "2003"}}
{"id": "2003", "name": "name2003"}
已有文档id会被覆盖,不存在的id则新增
9.2.4、批量跟新部分文档数据
POST /shop/_doc/_bulk
{"update": {"_id": "2001"}}
{"doc":{ "id": "3004"}}
{"update": {"_id": "2007"}}
{"doc":{ "name": "nameupdate"}}
9.2.5、批量删除
POST /shop/_doc/_bulk
{"delete": {"_id": "2004"}}
{"delete": {"_id": "2007"}}
9.2.6、混合批量各种操作
POST /shop/_doc/_bulk
{"create": {"_id": "8001"}}
{"id": "8001", "name": "name8001"}
{"update": {"_id": "2001"}}
{"doc":{ "id": "20010"}}
{"delete": {"_id": "2003"}}
{"delete": {"_id": "2005"}}
十、Elasticsearch集群
学习了过单机的ES后,我们可以把注意力转移到高可用上,一般我们可以把es搭建成集群,2台以上就能成为es集群了。集群不仅可以实现高可用,也能实现海量数据存储的横向扩展。
10.1、es分片机制
下面我们举例说明一下Elasticsearch在集群情况下的默认分片机制。
现在我们有3台服务器,ip分别是192.168.1.100、192.168.1.101、192.168.1.102,然后新建了一个索引叫shop,他有5个分片(分别叫:主1,主2,主3,主4,主5),每个分片又有一个备份分片(分别叫:备1,备2,备3,备4,备5)。
es会先分配主分片,按照主1-主5的顺序依次分配到3台服务器上,100服务器上分配主1、主4,101服务器上分配主2、主5,102服务器上分配主3,情况大致如下:
ip地址 | 分片名 | 分片名 |
---|---|---|
192.168.1.100 | 主1 | 主4 |
192.168.1.101 | 主2 | 主5 |
192.168.1.102 | 主3 |
然后分配备份分片,会接着上次分配到的服务器,继续按照备1-备5的顺序依次分配到3台服务器上,有所不同的是如果备份分片和主分片将要被分配到同一个服务器上时(比如:3个服务器,6个分片的时候,主1和备1就会被分配到同一个服务器上),es会跳过这个服务器,再继续分配,情况大致如下:
ip地址 | 分片名 | 分片名 | 分片名 | 分片名 |
---|---|---|---|---|
192.168.1.100 | 主1 | 主4 | 备2 | 备5 |
192.168.1.101 | 主2 | 主5 | 备3 | |
192.168.1.102 | 主3 | 备1 | 备4 |
副本分片是主分片的备份,主挂了,备份还是可以访问。同一个分片的主与副本是不会放在同一个服务器里的,因为一旦宕机,这个分片就没了,如果现在100服务器宕机了,备1和备4就会接替主1和主4继续提供访问,我们系统还是保持了完整的服务提供。
10.2、搭建Elasticsearch集群
装备3台服务器装好es,我这里ip分别是192.168.1.100、192.168.1.101、192.168.1.102
10.2.1、清空单机部署时的数据
- 将es的数据清空,根据核心配置文件里面的配置找到数据存放的文件夹,我这里进入es根目录下的data文件夹,将里面的所有数据清空
rm -rf *
10.2.2、修改核心配置文件(elasticsearch.yml)
- 三台机器的配置,除了node.name其他参数都是一样的
# 集群名称,同一集群的节点名称要保持一致
cluster.name: es-zzm
# 节点名称,同一集群下每个节点名称应该保持不一样
node.name: es-node0
# 表示当前节点是主节点,true表示当前节点可用作为master,false表示当前节点永远不可以作为master
node.master: true
# 表示当前节点是数据节点
node.data: true
# 发现集群节点,就是配置所有节点的ip地址
discovery.seed_hosts: ["192.168.1.100","192.168.1.101","192.168.1.102"]
# 初始化master的节点,使用节点名
cluster.initial_master_nodes: ["es-node0"]
- 配置完之后可以通过命令删除掉所有的注释,看的更简洁
more elasticsearch.yml | grep ^[^#]
10.2.3、三台同时启动es
- 切换用户
su zzm
- 进入es根目录下的bin文件夹,然后执行启动命令
./elasticsearch
10.3、集群脑裂问题
如果发生网络中断或者服务器宕机,那么集群会有可能被划分为两个部分,各自有自己的master来管理,那么这就是脑裂。
master主节点要经过多个master节点共同选举后才能成为新的主节点。就跟班级里选班长一样,并不是你1个人能决定的,需要班里半数以上的人决定。解决实现原理:半数以上的节点同意选举,节点方可成为新的master。discovery.zen.minimum_master_nodes=(N/2)+1
N为集群的中master节点的数量,也就是那些 node.master=true 设置的那些服务器节点总数
在最新版7.x中,minimum_master_node这个参数已经被移除了,这一块内容完全由es自身去管理,这样就避免了脑裂的问题,选举也会非常快
十一、Elasticsearch整合SpringBoot
11.1、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<!-- 这个版本 只支持到ES6.4.3 ,如果需要更高的版本,需要把这个包里面的elasticsearch排除掉,额外再引用一个更新版本的elasticsearch-->
<version>2.2.2.RELEASE</version>
</dependency>
11.2、添加配置文件
spring:
data:
elasticsearch:
cluster-name:
# 这里端口号是9300,默认就是9300,通过java连接要使用9300
cluster-nodes: 192.168.1.100:9300,192.168.1.101:9300,192.168.1.102:9300
11.3、解决netty引起的issue问题
- 创建一个es配置类ESConfig
@Configuration
public class ESCofig{
@postConstruct
void init(){
System.setProperty("es.set.netty.runtime.available.processors","false");
}
}
十二、SpringBoot操作Elasticsearch
- 在需要使用的类中依赖注入es的模板类
@Autowired
private ElasticsearchTemplate esTemplate;
12.1、创建索引
- 创建实体类,并增加es配置
@Document(indexName = "stu",type = "_doc")
@Data
public class Stu(
@Id
private Long stuId;
// Field里面的属性和es的是一样的,根据需要自行配置
@Field(store = true)
private String name;
@Field(store = true)
private Integer age;
}
- 创建索引的方法
@Test
public void createIndexStu(){
Stu stu = new Stu();
// 默认5个分片,各有1个副本
IndexQuery indexQuery= new IndexQueryBuilder().withObject(stu).build();
esTemplate.index(indexQuery);
}
12.2、删除索引
@Test
public void deleteIndexStu(){
esTemplate.deleteIndex(Stu.class);
}
12.3、新增索引和文档数据
- 如果索引存在直接添加数据,如果索引不存在,就创建索引和添加文档数据
@Test
public void createIndexStu() {
Stu stu = new Stu();
stu.setStuId(1002L);
stu.setName("张三");
stu.setAge(22);
stu.setMoney(999.6f);
stu.setSign("I am is a man");
stu.setDescription("I wish i am a man");
// 默认5个分片,各有1个副本,stu里面有值 会直接创建一条文档
IndexQuery indexQuery = new IndexQueryBuilder()
.withObject(stu)
.build();
String index = esTemplate.index(indexQuery);
System.out.println(index);
}
12.4、修改文档数据
public void updateStu(){
Map<String,Object> map= new HashMap<>();
map.put("name","张三");
IndexRequest indexRequest = new IndexRequest();
indexRequest.source(map);
// 默认5个分片,各有1个副本,stu里面有值 会直接创建一条文档
UpdateQuery updateQuery= new UpdateQueryBuilder()
.withClass(Stu.class)
.withId("1001")
.withIndexRequest(indexRequest)
.build();
esTemplate.update(updateQuery);
}
12.5、查询文档数据
@Test
public void getStu(){
GetQuery query = new GetQuery();
query.setId("1001");
Stu stu = esTemplate.queryForObject(query,Stu.class);
}
12.6、删除文档数据
@Test
public void deleteStu(){
esTemplate.delete(Stu.class,"1001");
}
12.7、分页查询
@Test
public void searchStuDoc(){
// 查询第一页的,每页十条,页数的索引是从0开始的
Pageable pageable = PageRequest.of(0,10);
SearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("description","Ben jock"))
.withPageable(pageable)
.build();
AggregatedPage<Stu> pagedStu = esTemplate.queryForPage(query,Stu.class);
System.out.println("检索的总页数" + pageStu.getTotalPages());
// 获取查询到的数据
List<Stu> stuList = pageStu.getContent();
stuList.stream().forEach(stu -> {
System.out.println(stu.toString());
});
}
12.8、高亮查询
@Test
public void highlightStuDoc(){
String preTag = "<font color='red'>";
String postTag = "</font>";
Pageable pageable = PageRequest.of(0, 5);
SearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("description", "a man"))
// 设置高亮
.withHighlightFields(new HighlightBuilder.Field("description")
.preTags(preTag)
.postTags(postTag))
// 分页
.withPageable(pageable)
.build();
AggregatedPage<Stu> pageStu = esTemplate.queryForPage(query, Stu.class, new SearchResultMapper() {
// 映射处理,对返回的数据进行处理,默认情况下返回数据是没有高亮的,必须从专门的高亮的封装中把值取出
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
List<Stu> stuListHighlight = new ArrayList<>();
// 获取返回的所有数据信息
SearchHits hits = response.getHits();
hits.forEach(hit -> {
// 获取对应的高亮对象
HighlightField highlightField = hit.getHighlightFields().get("description");
// 获取高亮字段值
String description = highlightField.getFragments()[0].toString();
// 获取其他的属性
Object stuId = (Object) hit.getSourceAsMap().get("stuId");
String name = (String) hit.getSourceAsMap().get("name");
Integer age = (Integer) hit.getSourceAsMap().get("age");
Object money = (Object) hit.getSourceAsMap().get("money");
String sign = (String) hit.getSourceAsMap().get("sign");
Stu stu = new Stu();
stu.setStuId(Long.valueOf (stuId.toString()));
stu.setName(name);
stu.setAge(age);
stu.setMoney(Float.valueOf(money.toString()));
stu.setSign(sign);
stu.setDescription(description);
stuListHighlight.add(stu);
});
if (stuListHighlight.size() > 0){
return new AggregatedPageImpl<>(((List<T>)stuListHighlight));
}else{
return null;
}
}
});
System.out.println("检索的总页数" + pageStu.getTotalPages());
// 获取查询到的数据
List<Stu> stuList = pageStu.getContent();
stuList.stream().forEach(stu -> {
System.out.println(stu.toString());
});
}
12.9、排序查询
@Test
public void sortStuDoc(){
// 查询第一页的,每页十条,页数的索引是从0开始
Pageable pageable = PageRequest.of(0, 5);
// 定义排序的字段
SortBuilder sortBuilder = new FieldSortBuilder("money")
.order(SortOrder.DESC);
SearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("description", "a man"))
.withPageable(pageable)
// 配置排序的字段、可以配置多少个
.withSort(sortBuilder)
.build();
AggregatedPage<Stu> pageStu = esTemplate.queryForPage(query, Stu.class);
System.out.println("检索的总页数" + pageStu.getTotalPages());
// 获取查询到的数据
List<Stu> stuList = pageStu.getContent();
stuList.stream().forEach(stu -> {
System.out.println(stu.toString());
});
}
十三、Logstash
Logstash是elastic技术栈中的一个技术。它是一个数据采集引擎,可以从数据库采集数据到es中。我们可以通过设置自增id主键或者时间来控制数据的自动同步,这个id或者时间就是用于给logstash进行识别的
- id:假设现在有1000条数据,Logstatsh识别后会进行一次同步,同步完会记录这个id为1000,以后数据库新增数据,那么id会一直累加,Logstatsh会有定时任务,发现有id大于1000了,则增量加入到es中
- 时间:同理,一开始同步1000条数据,每条数据都有一个字段,为time,初次同步完毕后,记录这个time,下次同步的时候进行时间比对,如果有超过这个时间的,那么就可以做同步,这里可以同步新增数据,或者修改元数据,因为同一条数据的时间更改会被识别,而id则不会。
13.1、安装Logstash
13.1.1、下载Logstash并配置好环境
- 下载地址:logstash下载地址
使用Logstatsh的版本号与elasticsearch版本号需要保持一致
- 插件 logstash-input-jdbc本插件用于同步,es6.x起自带,这个是集成在了 logstash中的。所以直接配置同步数据库的配置文件即可
- 创建索引,同步数据到es中,前提得要有索引,这个需要手动先去创建,名字随意。
- JDK,记得安装JDK,java -version检查一下,如果没有安装,需要安装一下
- mysql,还需要准备好mysql的驱动,根据mysql安装的版本自行下载
13.1.2、解压Logstash
tar -zxvf logstash-6.4.3.tar.gz -C /usr/local/
13.1.3、添加配置
# 进入logstash根目录
cd D:/logstash-6.4.3/bin
# 创建文件夹用于存放同步数据的相关配置
mkdir sync
cd sync
# 将mysql的驱动拷贝过来
cp D:/logstash-6.4.3/mysql-connector-java-8.0.11.jar .
# 创建配置文件(配置信息在后面)
vim logstash-db-sync.conf
- logstash-db-sync.conf配置如下:
input {
jdbc {
# mysql 数据库链接,dianping为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/eat_shop?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "1999114zzm"
# 设置 timezone
jdbc_default_timezone => "Asia/Shanghai"
# 驱动
jdbc_driver_library => "D:/logstash-6.4.3/bin/sync/mysql-connector-java-8.0.11.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 开启分页
jdbc_paging_enabled => "true"
# 分页每页数量
jdbc_page_size => "50000"
# 执行的sql文件路径 + 名称
statement_filepath => "D:/logstash-6.4.3/bin/sync/food-items.sql"
# 设置监听间隔 个字段含义(由左至右)分、时、天、月、年,全部为*默认含义
schedule => "* * * * *"
# 索引类型
type => "_doc"
# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => true
# 记录上一次追踪的结果值
last_run_metadata_path => "D:/logstash-6.4.3/bin/sync/track_time"
# 如果 use_column_value 为true,配置本参数,追踪的 column 名,可以是
tracking_column => "updated_time"
# tracking_column 对应字段的类型
tracking_column_type => "timestamp"
# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有
clean_run => false
# 数据库字段名称大写转小写
lowercase_column_names => false
}
}
output {
elasticsearch {
# ES的IP地址及端口,如果是集群 这里就是一个数组
hosts => ["127.0.0.1:9200"]
#索引名称
index => "food-items"
#自增ID需要关联的数据库中有一个id字段,对应索引的id号
document_id => "%{itemId}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
13.1.4、编写sql
SELECT
i.id as itemId,
i.item_name as itemName,
i.sell_counts as sellCounts,
ii.url as imgUrl,
tempSpec.price_discount as price,
i.updated_time as updated_time
FROM
items i
LEFT JOIN
items_img ii
ON
i.id = ii.item_id
LEFT JOIN
(
SELECT
item_id, MIN(price_discount) as price_discount
From
items_spec
GROUP BY
item_id
) tempSpec
ON
tempSpec.item_id = i.id
WHERE
ii.is_main = 1
AND
i.updated_time >= :sql_last_value
13.1.5、启动Logstash
- 如果根据时间来同步数据,就需要把数据对应的字段的时间修改到当前时间之后,不然无法同步,如果数据物理删除ES是无法同步的,所以最好使用逻辑删除
./logstash -f D:/logstash-6.4.3/bin/sync/logstash-db-sync.conf
13.2、自定义模板配置中文分词
目前的数据同步,mappings映射会自动创建,但是分词不会,还是会使用默认的,而我们需要中文分词,这个时候就需要自定义模板功能来设置分词了
13.2.1、配置分词模板
- 创建D:/logstash-6.4.3/bin/sync/logstash-ik.json文件
{
"order": 1,
"version": 1,
"index_patterns": ["*"],
"settings": {
"index": { "refresh_interval": "5s" }
},
"mappings": {
"_default_": {
"dynamic_templates": [ {
"message_field": {
"path_match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false
}
}
}, {
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false,
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}],
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"location": {
"type": "geo_point"
},
"latitude": {
"type": "half_float"
},
"longitude": {
"type": "half_float"
}
}
}
}
}
},
"aliases": {}
}
13.2.2、修改同步配置文件(logstash-db-sync.conf)
elasticsearch {
# ES的IP地址及端口,如果是集群 这里就是一个数组
hosts => ["127.0.0.1:9200"]
#索引名称
index => "food-items-ik"
#自增ID需要关联的数据库中有一个id字段,对应索引的id号
document_id => "%{itemId}"
# 定义模板名称
template_name => "myik"
# 模板所在位置
template => "D:/logstash-6.4.3/bin/sync/logstash-ik.json"
# 重写模板
template_overwrite => true
# 默认为true,false关闭logstash自动管理模板功能,如果自定义模板,则设置为false
manage_template => false
}
13.2.3、重新运行Logstash进行同步
./logstash -f D:/logstash-6.4.3/bin/sync/logstash-db-sync.conf
13.2.4、如果运行后,没有中分分词,做如下操作
- 将默认的中分分词模板删除
http://localhost:9200/_template/logstash?pretty
- 查看自定义模板是否上传成功(存在)
http://localhost:9200/_template/myik
- 查看默认模板是否存在(不存在)
http://localhost:9200/_template/logstash
- 上列三步执行成功后,删除es的索引,并创建新的索引,重启logstash
./logstash -f D:/logstash-6.4.3/bin/sync/logstash-db-sync.conf
13.3、logstash-db-sync.conf完整配置
input {
jdbc {
# mysql 数据库链接,dianping为数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/eat_shop?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "1999114zzm"
# 设置 timezone
jdbc_default_timezone => "Asia/Shanghai"
# 驱动
jdbc_driver_library => "D:/logstash-6.4.3/bin/sync/mysql-connector-java-8.0.11.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 开启分页
jdbc_paging_enabled => "true"
# 分页每页数量
jdbc_page_size => "50000"
# 执行的sql文件路径 + 名称
statement_filepath => "D:/logstash-6.4.3/bin/sync/food-items.sql"
# 设置监听间隔 个字段含义(由左至右)分、时、天、月、年,全部为*默认含义
schedule => "* * * * *"
# 索引类型
type => "_doc"
# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => true
# 记录上一次追踪的结果值
last_run_metadata_path => "D:/logstash-6.4.3/bin/sync/track_time"
# 如果 use_column_value 为true,配置本参数,追踪的 column 名,可以是
tracking_column => "updated_time"
# tracking_column 对应字段的类型
tracking_column_type => "timestamp"
# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有
clean_run => false
# 数据库字段名称大写转小写
lowercase_column_names => false
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["127.0.0.1:9200"]
#索引名称
index => "food-items-ik"
#自增ID需要关联的数据库中有一个id字段,对应索引的id号
document_id => "%{itemId}"
# 定义模板名称
template_name => "myik"
# 模板所在位置
template => "D:/logstash-6.4.3/bin/sync/logstash-ik.json"
# 重写模板
template_overwrite => true
# 默认为true,false关闭logstash自动管理模板功能,如果自定义模板,则设置为false
manage_template => false
}
stdout {
# JSON格式输出
codec => json_lines
}
}