1.安装logstash
执行以下脚本,下载安装包
注意版本,本人使用的是elastaic6.2.4,logstash的版本也是6.2.4wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
- 执行以下命令,解压安装包
tar -xzvf logstash-6.2.4.tar.gz
2.安装插件
因为要从mysql中取数据,输出到elasticsearch,故需要安装以下两款插件
logstash-input-jdbc
logstash-output-elasticsearch
- 进入logstash的bin文件夹下,执行以下两个命令,安装上述两款插件
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch
安装完成后会有成功提示
3.安装mysql驱动
下载mysql-connector-java-5.1.6.jar,上传至logstash的bin文件夹下。
4.配置logstash conf文件
在logstash的bin文件夹下创建mysql-es.conf文件,内容如下:
input{ jdbc { jdbc_driver_library => "mysql-connector-java-5.1.6.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.1.15:3306/test" jdbc_user => "test" jdbc_password => "test" jdbc_paging_enabled => "true" jdbc_page_size => "1000" jdbc_default_timezone =>"Asia/Shanghai" schedule => "* * * * *" statement => "select * from test where gmt_modified > :sql_last_value order by id " use_column_value => true tracking_column => "updatetime" last_run_metadata_path => "./logstash_jdbc_last_run" } } output{ elasticsearch { hosts => "192.168.1.103:9200" index => "test" user => "elastic" password => "es_password" document_id => "%{id}" document_type => "order" } stdout { codec => json_lines } }
5.开始同步
- 进入logstash的bin目录下,执行以下命令,开始同步数据到elsasticsearch
./logstash -f mysql-es.conf
然后logstash就默默开始同步了。
6.如果同步不小心中断了,我们得切换到更新模式了
elasticsearch中如果该id不存在则创建,如果存在则更新
mysql-es.conf需要修改成如下样子:
input{
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.6.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.1.102:3306/caibaotc"
jdbc_user => "test"
jdbc_password => "test"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
jdbc_default_timezone =>"Asia/Shanghai"
schedule => "* * * * *"
statement => "select * from test where gmt_modified > :sql_last_value order by id"
use_column_value => true
tracking_column => "gmt_modified"
last_run_metadata_path => "./logstash_jdbc_last_run"
codec => plain{ charset => "UTF-8" }
}
}
output{
elasticsearch {
action => "update"
doc_as_upsert => true
hosts => ["192.168.1.11:9200","192.168.1.12:9200","192.168.1.13:9200"]
index => "test"
document_id => "%{id}"
document_type => "order"
codec => plain{ charset => "UTF-8" }
}
# stdout {
# codec => json_lines
# }
}
参考:
https://www.elastic.co/guide/en/logstash/6.3/output-plugins.html
https://www.elastic.co/guide/en/logstash/6.3/plugins-inputs-jdbc.html