概要
假设现在给Logstash的pipeline配置了2个conf,也就是2个输入源。如果不做任何处理,那么所有的Filter和Output都会同时触发,如下图:
这显然跟我们期望的不同,我们希望Logstash按以下的方式来处理,也就是各自区分,独立处理:
因此,在处理Filter和Output时,需要用type参数来区分。
input配置type
- input - 1
input {
stdin {
}
jdbc {
# 在这里增加type,名字自定义
type => "article"
jdbc_driver_library => "/usr/share/logstash/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@192.168.1.100:1521:orcl"
jdbc_user => "test"
jdbc_password => "123456"
schedule => "* * * * *"
statement_filepath => "/usr/share/logstash/sql/article.sql"
use_column_value => false
jdbc_fetch_size => 1000000
last_run_metadata_path => "visitinfo.txt"
}
}
- input -2
input {
stdin {
}
jdbc {
# 在这里增加type,名字自定义
type => "blogs"
jdbc_driver_library => "/usr/share/logstash/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@192.168.1.100:1521:orcl"
jdbc_user => "test"
jdbc_password => "123456"
schedule => "* * * * *"
statement_filepath => "/usr/share/logstash/sql/blogs.sql"
use_column_value => false
jdbc_fetch_size => 1000000
last_run_metadata_path => "blogs.txt"
}
}
Filter判断type
- Filter - 1
filter {
if [type] == "article" {
mutate {
rename => {
"title" => "artitleTitle"
}
}
}
}
- Filter - 2
filter {
if [type] == "blogs" {
mutate {
rename => {
"title" => "blogTitle"
}
}
}
}
Output判断type
- Output - 1
output {
if [type] == "article" {
elasticsearch {
hosts => ["192.168.1.201:9200"]
# 索引名称
index => "es_article"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
}
- Output - 2
output {
if [type] == "blogs" {
elasticsearch {
hosts => ["192.168.1.201:9200"]
# 索引名称
index => "es_blog"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
}
总结
通过以上方式配置,就能将pipeline各自区分开。大家可以试下,如果把type参数去掉,最后的elasticsearch数据会混乱,也就是说es_blog会有es_article索引的数据,这样显然是错误的。