公司需要代理平台需要升级,需要记录调用日志,并且返回结果也要保存(可能会有全量数据)于是选择mongo存储
logstash配置过程中 刚开始设置了两个两个output中 但是无意中发现 es或者mongo挂了 logstash就一直等待了 于是采用双管道 互不影响工作
配置如下:
es.conf
input {
kafka {
bootstrap_servers => ["127.0.0.1:9092"] # 注意这里配置的kafka的broker地址不是zk的地址
group_id => "logstash" # 自定义groupid
topics => ["ttp"] # kafka topic 名称
consumer_threads => 5
decorate_events => true
codec => "json"
}
}
filter{
mutate{
add_field => { "@msg" => "%{msg}" } #先新建一个新的字段,并将msg赋值给它
}
json{
source => "@msg" #再进行解析
remove_field => [ "@msg","msg","apiNickName","result"] #删除不必要的字段,也可以不用这语句
}
}
output {
elasticsearch { hosts => ["127.0.0.1:9200"]
index => "kafka_real"}
stdout { codec => rubydebug }
}
mongo.conf
input {
kafka {
bootstrap_servers => ["127.0.0.1:9092"] # 注意这里配置的kafka的broker地址不是zk的地址
group_id => "logstash1" # 自定义groupid
topics => ["ttp"] # kafka topic 名称
consumer_threads => 5
decorate_events => true
codec => "json"
}
}
filter{
mutate{
add_field => { "@msg1" => "%{msg}" } #先新建一个新的字段,并将msg赋值给它
}
json{
source => "@msg1" #再进行解析
remove_field => [ "@msg1","msg","apiNickName"] #删除不必要的字段,也可以不用这语句
}
}
output {
mongodb {
codec => line {format => "%{message}"}
uri => "mongodb://127.0.0.1:27017"
database => "test"
collection => "trace_log"
}
stdout { codec => rubydebug }
}
pipelines.yml
- pipeline.id: my-pipeline_1
path.config: "../myconf/es.conf"
pipeline.workers: 3
- pipeline.id: my-other-pipeline
path.config: "../myconf/mongo.conf"
queue.type: persisted
启动
logstash