elasticsearch join + logstash

一、elasticsearch 定义父字段和子字段(字段名称可随意定):父(parent_cus_no)、子(child_cus_no)

curl -H "Content-Type: application/json" -XPUT 'localhost:9200/_index/_doc/_mapping' -d '
{
  "properties": {
    "join_field": {
      "type": "join",
      "relations": {
        "parent_cus_no": "child_cus_no"
      }
    }
  }
}'

二、定义logstash

input {
  stdin {
  }
  jdbc {
    # 数据库地址  端口  数据库名
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
    # 数据库用户名      
    jdbc_user => "root"
    # 数据库密码
    jdbc_password => "123456"
    # mysql java驱动地址 
    jdbc_driver_library => "/Users/chenfenli/Bin/elasticsearch/logstash-6.4.2/conf_mysql/mysql-connector-java-5.1.41-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # sql 语句文件
    statement_filepath => "/Users/chenfenli/Bin/elasticsearch/logstash-6.4.2/conf_mysql/hz_im_con.sql"
    schedule => "* * * * *"
    type => "hic"
  }
  jdbc {
    # 数据库地址  端口  数据库名
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
    # 数据库用户名      
    jdbc_user => "root"
    # 数据库密码
    jdbc_password => "123456"
    # mysql java驱动地址 
    jdbc_driver_library => "/Users/chenfenli/Bin/elasticsearch/logstash-6.4.2/conf_mysql/mysql-connector-java-5.1.41-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # sql 语句文件
    statement_filepath => "/Users/chenfenli/Bin/elasticsearch/logstash-6.4.2/conf_mysql/hz_out_det.sql"
    schedule => "* * * * *"
    type => "hod"
  }
}
filter {
  if[type] == "hic" {
    mutate {
      add_field => { 
        "join_field" => "parent_cus_no" 
      }     
    }
  }
  if[type] == "hod" {
    mutate {
      add_field => {"[join_field][name]" => "child_cus_no"}
      add_field => {"[join_field][parent]" => "hic_%{cus_no}"}
    }
  }
}
output {
  stdout {
    codec => json_lines
  }
  if[type] == "hic" {
    elasticsearch {
      hosts  => "localhost:9200"
      index => "hz_import_consumer"
      document_type => "_doc"
      document_id => "hic_%{cus_no}"
    }
  }
  if[type] == "hod" {
    elasticsearch {
      hosts  => "localhost:9200"
      index => "hz_import_consumer"
      document_type => "_doc"
      document_id => "hod_%{pkid}"
      routing => "hic_%{cus_no}"
    }
  }
}

1、父文档动态添加字段:

"join_field" => "parent_cus_no"    # 上面定义的字段

2、子文档动态添加(child_cus_no必须是上面定义的字段):

add_field => {"[join_field][name]" => "child_cus_no"}        # 上面定义的字段
add_field => {"[join_field][parent]" => "hic_%{cus_no}"}     # 父文档的document_id

3、路由

routing => "hic_%{cus_no}"    # 父文档document_id

猜你喜欢

转载自blog.csdn.net/oJueQiang123456/article/details/85882264