搜索引擎:
索引组件:获取数据-->建立文档-->文档分析-->文档索引(倒排索引)
搜索组件:用户搜索接口-->建立查询(将用户键入的信息转换为可处理的查询对象)-->搜索查询-->展现结果
索引组件:Lucene
搜索组件:Solr, ElasticSearch
Lucene Core:
Apache LuceneTM is a high-performance, full-featured text search engine library written entirely in Java. It is a technology suitable for nearly any application that requires full-text search, especially cross-platform.
Solr:
SolrTM is a high performance search server built using Lucene Core, with XML/HTTP and JSON/Python/Ruby APIs, hit highlighting, faceted search, caching, replication, and a web admin interface.
ElasticSearch:
Elasticsearch is a distributed, RESTful search and analytics engine capable of solving a growing number of use cases. As the heart of the Elastic Stack, it centrally stores your data so you can discover the expected and uncover the unexpected.
Elastic Stack:
ElasticSearch
Logstash
Logstash is an open source, server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it to your favorite “stash.” (Ours is Elasticsearch, naturally.)
Beats:
Filebeat:Log Files
Metricbeat:Metrics
Packetbeat:Network Data
Winlogbeat:Windows Event Logs
Heartbeat:Uptime Monitoring
Kibana:
Kibana lets you visualize your Elasticsearch data and navigate the Elastic Stack, so you can do anything from learning why you're getting paged at 2:00 a.m. to understanding the impact rain might have on your quarterly numbers.
TF/IDF算法:
https://zh.wikipedia.org/wiki/Tf-idf
ES的核心组件:
物理组件:
集群:
状态:green, yellow, red
节点:
Shard:
Lucene的核心组件:
索引(index):数据库(database)
类型(type):表(table)
文档(Document):行(row)
映射(Mapping):
ElasticSearch 5的程序环境:
配置文件:
/etc/elasticsearch/elasticsearch.yml
/etc/elasticsearch/jvm.options
/etc/elasticsearch/log4j2.properties
Unit File:elasticsearch.service
程序文件:
/usr/share/elasticsearch/bin/elasticsearch
/usr/share/elasticsearch/bin/elasticsearch-keystore:
/usr/share/elasticsearch/bin/elasticsearch-plugin:管理插件程序
搜索服务:
9200/tcp
集群服务:
9300/tcp
els集群的工作逻辑:
多播、单播:9300/tcp
关键因素:clustername
所有节点选举一个主节点,负责管理整个集群的状态(green/yellow/red),以及各shards的分布方式;
插件:
集群配置:
elasticsearch.yml配置文件:
cluster.name: myels
node.name: node1
path.data: /data/els/data
path.logs: /data/els/logs
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["node1", "node2", "node3"]
discovery.zen.minimum_master_nodes: 2
RESTful API: CRUD(Create, Read, Update, Delete)
curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
<BODY>:json格式的请求主体;
<VERB>
GET,POST,PUT,DELETE
特殊PATH:/_cat, /_search, /_cluster
<PATH>
/index_name/type/Document_ID/
curl -XGET 'http://10.1.0.67:9200/_cluster/health?pretty=true'
curl -XGET 'http://10.1.0.67:9200/_cluster/stats?pretty=true'
curl -XGET 'http://10.1.0.67:9200/_cat/nodes?pretty'
curl -XGET 'http://10.1.0.67:9200/_cat/health?pretty'
创建文档:
curl -XPUT
特殊PATH:/_cat, /_search, /_cluster
文档:
{"key1": "value1", "key2": value, ...}
ELS:分布式、开源、RESTful、近乎实时
集群:一个或多个节点的集合;
节点:运行的单个els实例;
索引:切成多个独立的shard;(以Lucene的视角,每个shard即为一个独立而完整的索引)
primary shard:r/w
replica shard: r
查询:
ELS:很多API
_cluster, _cat, _search
curl -X GET '<SCHEME://<HOST>:<PORT>/[INDEX/TYPE/]_search?q=KEYWORD&sort=DOMAIN:[asc|desc]&from=#&size=#&_source=DOMAIN_LIST'
/_search:搜索所有的索引和类型;
/INDEX_NAME/_search:搜索指定的单个索引;
/INDEX1,INDEX2/_search:搜索指定的多个索引;
/s*/_search:搜索所有以s开头的索引;
/INDEX_NAME/TYPE_NAME/_search:搜索指定的单个索引的指定类型;
简单字符串的语法格式
http://lucene.apache.org/core/6_6_0/queryparser/org/apache/lucene/queryparser/classic/package-summary.html#package.description
bike, bake, age: [20,200]
查询类型:Query DSL,简单字符串;
文本匹配的查询条件:
(1) q=KEYWORD, 相当于q=_all:KEYWORD
(2) q=DOMAIN:KEYWORD
{
"name" : "Docker in Action",
"publisher" : "wrox",
"datatime" : "2015-12-01",
"author" : "Blair"
}
_all: "Docker in Action Wrox 2015-12-01 Blair"
修改默认查询域:df属性
查询修饰符:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
自定义分析器:
analyzer=
默认操作符:OR/AND
default_operator, 默认值为OR
返回字段:
fields=
注:5.X不支持;
结果排序:
sort=DOMAIN:[asc|desc]
搜索超时:
timeout=
查询结果窗口:
from=,默认为0;
size=, 默认为10;
Lucene的查询语法:
q=
KEYWORD
DOMAIN:KEYWORD
+DOMAIN:KEYWORD -DOMAIN:KEYWORD
els支持从多类型的查询:
Full text queries
安装elasticsearch-head插件:
5.X:
(1) 设置elasticsearch.yml配置文件:
http.cors.enabled: true
http.cors.allow-origin: "*"
(2) 安装head:
$ git clone https://github.com/mobz/elasticsearch-head.git
$ cd elasticsearch-head
$ npm install
$ npm run start
$ npm run proxy
ELK:
E: elasticsearch
L: logstash,日志收集工具;
ELK Beats Platform:
PacketBeat:网络报文分析工具,统计收集报文信息;
Filebeat:是logstash forwarder的替换者,因此是一个日志收集工具;
Topbeat:用来收集系统基础数据,如cpu、内存、io等相关的统计信息;
Winlogbeat
Metricbeat
用户自定义beat:
前提:模拟互联网访问的请求命令:
while true; do curl -H "X-Forwarded-For:$[$RANDOM%223+1].$[$RANDOM%255].1.1" http://172.18.0.70/test$[$RANDOM%25+1].html; sleep 1; done
Logstash配置:
input {
...
}
filter{
...
}
output {
...
}
简单示例配置:
input {
stdin {}
}
output {
stdout {
codec => rubydebug
}
}
示例2:从文件输入数据,经grok过滤器插件过滤之后输出至标准输出:
input {
file {
path => ["/var/log/httpd/access_log"]
start_position => "beginning"
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
remove_field: "message"
}
}
output {
stdout {
codec => rubydebug
}
}
示例3:date filter插件示例:
filter {
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG}"
}
remove_field => "message"
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
}
}
插件:mutate
The mutate filter allows you to perform general mutations on fields. You can rename, remove, replace, and modify fields in your events.
示例4:mutate filter插件
filter {
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG}"
}
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
}
mutate {
rename => {
"agent" => "user_agent"
}
}
}
示例5:geoip插件
filter {
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG}"
}
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
}
mutate {
rename => {
"agent" => "user_agent"
}
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
}
}
示例3:使用Redis
(1) 从redis加载数据
input {
redis {
batch_count => 1
data_type => "list"
key => "logstash-list"
host => "192.168.0.2"
port => 6379
threads => 5
}
}
(2) 将数据存入redis
output {
redis {
data_type => "channel"
key => "logstash-%{+yyyy.MM.dd}"
}
}
示例4:将数据写入els cluster
output {
elasticsearch {
hosts => ["http://node1:9200/","http://node2:9200/","http://node3:9200/"]
user => "ec18487808b6908009d3"
password => "efcec6a1e0"
index => "logstash-%{+YYYY.MM.dd}"
document_type => "apache_logs"
}
}
示例5:综合示例,启用geoip
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
remove_field => "message"
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
}
}
output {
elasticsearch {
hosts => ["http://172.16.0.67:9200","http://172.16.0.68:9200","http://172.16.0.69:9200"]
index => "logstash-%{+YYYY.MM.dd}"
action => "index"
document_type => "apache_logs"
}
}
grok:
%{SYNTAX:SEMANTIC}
SYNTAX:预定义的模式名称;
SEMANTIC:给模式匹配到的文本所定义的键名;
1.2.3.4 GET /logo.jpg 203 0.12
%{IP:clientip} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
{ clientip: 1.2.3.4, method: GET, request: /logo.jpg, bytes: 203, duration: 0.12}
%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)" %{HOST:domain} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} "(%{WORD:x_forword}|-)" (%{URIHOST:upstream_host}|-) %{NUMBER:upstream_response} (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}) > (%{BASE16FLOAT:request_time})
"message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
filter {
grok {
match => {
"message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
}
remote_field: message
}
}
nginx.remote.ip
[nginx][remote][ip]
filter {
grok {
match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx
][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\
" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"
%{DATA:[nginx][access][agent]}\""] }
remove_field => "message"
}
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][agent]"
}
geoip {
source => "[nginx][access][remote_ip]"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
}
}
output {
elasticsearch {
hosts => ["node1:9200","node2:9200","node3:9200"]
index => "logstash-ngxaccesslog-%{+YYYY.MM.dd}"
}
}
注意:
1、输出的日志文件名必须以“logstash-”开头,方可将geoip.location的type自动设定为"geo_point";
2、target => "geoip"
除了使用grok filter plugin实现日志输出json化之外,还可以直接配置服务输出为json格式;
示例:使用grok结构化nginx访问日志
filter {
grok {
match => {
"message" => "%{HTTPD_COMBINEDLOG} \"%{DATA:realclient}\""
}
remove_field => "message"
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
}
}
示例:使用grok结构化tomcat访问日志
filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
remove_field => "message"
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
}
}
Nginx日志Json化:
log_format json '{"@timestamp":"$time_iso8601",'
'"@source":"$server_addr",'
'"@nginx_fields":{'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":"$request_time",'
'"upstreamtime":"$upstream_response_time",'
'"upstreamaddr":"$upstream_addr",'
'"request_method":"$request_method",'
'"domain":"$host",'
'"url":"$uri",'
'"http_user_agent":"$http_user_agent",'
'"status":$status,'
'"x_forwarded_for":"$http_x_forwarded_for"'
'}'
'}';
access_log logs/access.log json;
Conditionals
Sometimes you only want to filter or output an event under certain conditions. For that, you can use a conditional.
Conditionals in Logstash look and act the same way they do in programming languages. Conditionals support if, else if and else statements and can be nested.
The conditional syntax is:
if EXPRESSION {
...
} else if EXPRESSION {
...
} else {
...
}
What’s an expression? Comparison tests, boolean logic, and so on!
You can use the following comparison operators:
equality: ==, !=, <, >, <=, >=
regexp: =~, !~ (checks a pattern on the right against a string value on the left) inclusion: in, not in
The supported boolean operators are:
and, or, nand, xor
The supported unary operators are:
!
Expressions can be long and complex. Expressions can contain other expressions, you can negate expressions with !, and you can group them with parentheses (...).
filter {
if [type] == 'tomcat-accesslog' {
grok {}
}
if [type] == 'httpd-accesslog' {
grok {}
}
}
在filebeat上添加自定义字段,区分不同的日志示例:
filebeat.prospectors:
- input_type: log
paths:
- /var/log/httpd/access_log
fields:
logtype: access
- paths:
- /var/log/httpd/error_log
fields:
logtype: errors
在logstash中分别处理不同日志的配置示例:
input {
redis {
host => "172.18.0.70"
port => 6379
password => "dhy.com"
db => 0
key => "filebeat"
data_type => "list"
}
}
filter {
if [fields][logtype] == "access" {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => ["message","beat"]
}
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
}
}
}
output {
if [fields][logtype] == "access" {
elasticsearch {
hosts => ["http://node01.dhy.com:9200/","http://node02.dhy.com:9200/"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "httpd_access_logs"
}
} else {
elasticsearch {
hosts => ["http://node01.dhy.com:9200/","http://node02.dhy.com:9200/"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "httpd_error_logs"
}
}
}