Elasticsearch及ELK使用(四):从数据库采集及写入数据库

ELK通常从文本文件中采集数据,然后写入Elasticsearch。 除此以外,还可以与数据库交互,包括两种场景:

  • 以数据库作为源,从其中采集内容。
  • 以数据库作为目的,将采集的内容写入数据库。

1 从数据库中采集

1.1 环境

已经有一个mysql数据库(版本为Mysql 8.0.18)。

mysql的上有schema=dbtest,有一张表person数据如下:
在这里插入图片描述

1.2 配置logstash

  1. 先确保filebeat(数据库采集不需要filebeat)和logstash已经停止。
  2. 先将Mysql的客户端驱动文件,mysql-connector-java-8.0.18.jar上传到logstash机器上的/tmp目录下。
  3. 在config目录中,以复制logstash-sample.conf样例文件,得到文件logstash_mysql.conf。
  4. 编辑config/logstash_mysql.conf,内容如下
input {
    jdbc {
        jdbc_driver_library => "/tmp/mysql-connector-java-8.0.18.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://192.168.43.201/dbtest?serverTimezone=UTC"
        jdbc_user => "root"
        jdbc_password => "Pwd_1234"
        schedule => "* * * * *"      #每分钟执行
        clean_run => true
        jdbc_default_timezone => "Asia/Shanghai"
        statement => "select * from person"
   }
}

output {
  elasticsearch {
    hosts => ["http://192.168.43.201:9200"]
    index => "person-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}  

schedule

* 5 * 1-3 * 从一月到三月每天早上5点的每一分钟都会执行。
0 * * * * 会在每天每小时的第0分钟执行。
0 6 * * * America/Chicago 每天早上6点(UTC/GMT -5)执行。

1.3 启动

  1. 启动logstash。
./logstash -f ../config/logstash_mysql.conf

启动需要多等一小会。

1.4 检查结果

当filebeat、logstash、elasticsearch、kibana都启动后。日志会自动采集,并存放到elasticsearch中。通过kibana可以查看结果:
在这里插入图片描述
注意:该index的health为yellow是因为,logstash向elasticsearch创建index时指定了number_of_replicas=1,既处理主数据外至少还需要一个副本。

而elasticsearch要求副本应分散在不同的节点node上,这样安全避免单节点故障而丢失数据。因本环境elasticsearch只有一个node,所有health为yellow。如果把该index的设置Setting中,“number_of_replicas” 改为 “0”。则health就会变为green。

4.4 搜索结果(文档)

在kibana里“Dev Tools”中通过RESTfull API 模糊搜索

GET /person-2020.06.25/_search
{
  "sort": [
    {
      "@timestamp": {
        "order": "asc"
      }
    }
  ]
}

搜索结果如下图,在message集中就表中数据内容。
在这里插入图片描述

注意:由于在logstash配置的是每一分钟从数据库采集一次,因此这里搜索结果有很多重复。

正确的做法应为:

  • 数据库表中涉及记录变更的字段,采集的SQL语句的where条件使用记录变更的字段进行过滤,只采集发生变化的数据,避免重复。
  • 或者将数据库表中的ID,对应到elasticsearch的文档ID,进行一一对应。

2 采集后写入数据库

这里以一个生产环境为例,通过filebeat实现文本采集,通过logstash将采集后的数据库写入DB。

2.1 采集源filebeat配置

以采集文本数据为例,filebeat配置如下:

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- type: log
  enabled: true
  fields:
    servicename: termfrontbpu
    logtype: MONITOR_STAT
  paths:
   - /app/appgess1/bin/log/*bpu.log    
  include_lines: [".*MARKER=MONITOR_STAT.*"]
  encoding: gbk

(省略其他内容....)

output.logstash:
  # The Logstash hosts
  hosts: ["20.200.54.51:56565"]

2.2 logstash安装插件

由于logstash官方插件中没有output-jdbc即写入数据库的插件,所以需要先安装第三方插件。

2.2.1 联机在线安装logstash-output-jdbc插件

  1. 确保可以联机互联网。
  2. cd logstash-6.8.3 (logstash-6.8.3解压安装目录)
  3. ./bin/logstash-plugin install logstash-output-jdbc 会自动联网从https://rubygems.org下载并安装。
  4. 验证结果: ./bin/logstash-plugin list 可以看到列表中多了 logstash-output-jdbc 插件

2.2.2 联机在线安装logstash-output-jdbc插件

  1. 以一台已经完成插件的为基准,通过复制的方式安装。
  2. cd logstash-6.8.3 (logstash-6.8.3解压安装目录)
  3. vim Gemfile文件。 在最后增加一行。
gem "logstash-output-jdbc"
  1. 将vendor/bundle/jruby/2.5.0/specifications目录下的文件logstash-output-jdbc-5.4.0.gemspec拷贝
  2. 将bundle/jruby/2.5.0/gems目录下的整个目录logstash-output-jdbc-5.4.0拷贝
  3. 验证结果: ./bin/logstash-plugin list 可以看到列表中多了 logstash-output-jdbc 插件

2.3 logstash配置

数据库中表定义字段elapsedmisec类型为int型,其他字段均为varchar类型。
logstash.conf配置文件全文内容如下:

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
    
    
  beats {
    
    
    port => 56565
  }
}

filter {
    
    
  dissect {
    
    
    mapping => {
    
     "message" => "%{nothings}MONITOR_STAT, RecvReqDateTime=%{recvreqdatetime}, UpstreamSysCode=%{upstreamsyscode}, ExchCode=%{exchcode}, ExchCodeDesc=%{exchcodedesc}, SysTraceNo=%{systraceno}, SeqNo=%{seqno}, ElapsedMiSec=%{elapsedmisec}, RspCode=%{rspcode}, RspCodeDesc=%{rspcodedesc}, OtherInfo=%{otherinfo}" }
  }
  dissect {
    
    
    convert_datatype => {
    
    "elapsedmisec" => "int"}
  }
}

output {
    
    
    if [fields][logtype] == "MONITOR_STAT" {
    
    
      jdbc {
    
    
        driver_jar_path => "/app/appgess/elk_logcollect/logstash-6.8.3_monitor/lib/postgresql-42.2.5.jar"
        driver_class => "org.postgresql.Driver"
        connection_string => "jdbc:postgresql://20.200.35.17:5432/common"
        username => "goldcomm"
        password => "goldcomm"
        statement => ["insert into traffic_statistics_details(hostname, servicename, recvreqdatetime, upstreamsyscode, exchcode, exchcodedesc, systraceno, seqno, elapsedmisec, rspcode, rspcodedesc, otherinfo) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "[beat][hostname]", "[fields][servicename]", "recvreqdatetime", "upstreamsyscode", "exchcode", "exchcodedesc", "systraceno", "seqno", "elapsedmisec", "rspcode", "rspcodedesc", "otherinfo" ]
      }
    } 
}

猜你喜欢

转载自blog.csdn.net/zyplanke/article/details/111876948