ELK通常从文本文件中采集数据,然后写入Elasticsearch。 除此以外,还可以与数据库交互,包括两种场景:
- 以数据库作为源,从其中采集内容。
- 以数据库作为目的,将采集的内容写入数据库。
1 从数据库中采集
1.1 环境
已经有一个mysql数据库(版本为Mysql 8.0.18)。
mysql的上有schema=dbtest,有一张表person数据如下:
1.2 配置logstash
- 先确保filebeat(数据库采集不需要filebeat)和logstash已经停止。
- 先将Mysql的客户端驱动文件,mysql-connector-java-8.0.18.jar上传到logstash机器上的/tmp目录下。
- 在config目录中,以复制logstash-sample.conf样例文件,得到文件logstash_mysql.conf。
- 编辑config/logstash_mysql.conf,内容如下
input {
jdbc {
jdbc_driver_library => "/tmp/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.43.201/dbtest?serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "Pwd_1234"
schedule => "* * * * *" #每分钟执行
clean_run => true
jdbc_default_timezone => "Asia/Shanghai"
statement => "select * from person"
}
}
output {
elasticsearch {
hosts => ["http://192.168.43.201:9200"]
index => "person-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
schedule
* 5 * 1-3 * | 从一月到三月每天早上5点的每一分钟都会执行。 |
---|---|
0 * * * * | 会在每天每小时的第0分钟执行。 |
0 6 * * * America/Chicago | 每天早上6点(UTC/GMT -5)执行。 |
1.3 启动
- 启动logstash。
./logstash -f ../config/logstash_mysql.conf
启动需要多等一小会。
1.4 检查结果
当filebeat、logstash、elasticsearch、kibana都启动后。日志会自动采集,并存放到elasticsearch中。通过kibana可以查看结果:
注意:该index的health为yellow是因为,logstash向elasticsearch创建index时指定了number_of_replicas=1,既处理主数据外至少还需要一个副本。
而elasticsearch要求副本应分散在不同的节点node上,这样安全避免单节点故障而丢失数据。因本环境elasticsearch只有一个node,所有health为yellow。如果把该index的设置Setting中,“number_of_replicas” 改为 “0”。则health就会变为green。
4.4 搜索结果(文档)
在kibana里“Dev Tools”中通过RESTfull API 模糊搜索
GET /person-2020.06.25/_search
{
"sort": [
{
"@timestamp": {
"order": "asc"
}
}
]
}
搜索结果如下图,在message集中就表中数据内容。
注意:由于在logstash配置的是每一分钟从数据库采集一次,因此这里搜索结果有很多重复。
正确的做法应为:
- 数据库表中涉及记录变更的字段,采集的SQL语句的where条件使用记录变更的字段进行过滤,只采集发生变化的数据,避免重复。
- 或者将数据库表中的ID,对应到elasticsearch的文档ID,进行一一对应。
2 采集后写入数据库
这里以一个生产环境为例,通过filebeat实现文本采集,通过logstash将采集后的数据库写入DB。
2.1 采集源filebeat配置
以采集文本数据为例,filebeat配置如下:
#=========================== Filebeat prospectors =============================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.
- type: log
enabled: true
fields:
servicename: termfrontbpu
logtype: MONITOR_STAT
paths:
- /app/appgess1/bin/log/*bpu.log
include_lines: [".*MARKER=MONITOR_STAT.*"]
encoding: gbk
(省略其他内容....)
output.logstash:
# The Logstash hosts
hosts: ["20.200.54.51:56565"]
2.2 logstash安装插件
由于logstash官方插件中没有output-jdbc即写入数据库的插件,所以需要先安装第三方插件。
2.2.1 联机在线安装logstash-output-jdbc插件
- 确保可以联机互联网。
- cd logstash-6.8.3 (logstash-6.8.3解压安装目录)
- ./bin/logstash-plugin install logstash-output-jdbc 会自动联网从https://rubygems.org下载并安装。
- 验证结果: ./bin/logstash-plugin list 可以看到列表中多了 logstash-output-jdbc 插件
2.2.2 联机在线安装logstash-output-jdbc插件
- 以一台已经完成插件的为基准,通过复制的方式安装。
- cd logstash-6.8.3 (logstash-6.8.3解压安装目录)
- vim Gemfile文件。 在最后增加一行。
gem "logstash-output-jdbc"
- 将vendor/bundle/jruby/2.5.0/specifications目录下的文件logstash-output-jdbc-5.4.0.gemspec拷贝
- 将bundle/jruby/2.5.0/gems目录下的整个目录logstash-output-jdbc-5.4.0拷贝
- 验证结果: ./bin/logstash-plugin list 可以看到列表中多了 logstash-output-jdbc 插件
2.3 logstash配置
数据库中表定义字段elapsedmisec类型为int型,其他字段均为varchar类型。
logstash.conf配置文件全文内容如下:
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 56565
}
}
filter {
dissect {
mapping => {
"message" => "%{nothings}MONITOR_STAT, RecvReqDateTime=%{recvreqdatetime}, UpstreamSysCode=%{upstreamsyscode}, ExchCode=%{exchcode}, ExchCodeDesc=%{exchcodedesc}, SysTraceNo=%{systraceno}, SeqNo=%{seqno}, ElapsedMiSec=%{elapsedmisec}, RspCode=%{rspcode}, RspCodeDesc=%{rspcodedesc}, OtherInfo=%{otherinfo}" }
}
dissect {
convert_datatype => {
"elapsedmisec" => "int"}
}
}
output {
if [fields][logtype] == "MONITOR_STAT" {
jdbc {
driver_jar_path => "/app/appgess/elk_logcollect/logstash-6.8.3_monitor/lib/postgresql-42.2.5.jar"
driver_class => "org.postgresql.Driver"
connection_string => "jdbc:postgresql://20.200.35.17:5432/common"
username => "goldcomm"
password => "goldcomm"
statement => ["insert into traffic_statistics_details(hostname, servicename, recvreqdatetime, upstreamsyscode, exchcode, exchcodedesc, systraceno, seqno, elapsedmisec, rspcode, rspcodedesc, otherinfo) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "[beat][hostname]", "[fields][servicename]", "recvreqdatetime", "upstreamsyscode", "exchcode", "exchcodedesc", "systraceno", "seqno", "elapsedmisec", "rspcode", "rspcodedesc", "otherinfo" ]
}
}
}