Filebeat (每个微服务启动一个)--->Kafka集群--->Logstash(one)-->Elasticsearch集群
一、数据流从文件端到Kafka 集群端,通过Filebeat
1.下载 Filebeat
#cd /opt/filebeat-6.3.2-linux-x86_64
filebeat.inputs:
- type: log
enabled: true
paths:
# - /var/log/*.log
- /home/kexin/out.log #监听的日志文件
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
output.kafka:
enabled: true
hosts: ["10.10.100.215:9092","10.10.100.216:9092","10.10.100.217:9092"] # kafka集群配置
topic: "caolihua01" #kafka 订阅的主题
2.启动Filebeat
# ./filebeat -c filebeat.yml 启动Filebeat
向监听的文件out.log 写入数据
#echo "别瞎搞3" >> out.log
3. 通过kafka-eagle查看写入kafka的消息
查看消息:
二、数据流从Kafka 集群到Elasticsearch端 ,通过logstah
logstash配置和启动
#cd /opt/logstash-6.3.2/bin
# ./logstash -f ../config/logstash.conf
input {
kafka {
bootstrap_servers => "10.10.100.215:9092,10.10.100.216:9092,10.10.100.217:9092"
topics => "caolihua01"
}
}
filter {
ruby {
message => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
hosts => ["10.10.100.215:9200","10.10.100.216:9200","10.10.100.217:9200"]
index => "kx_es_index"
}
}
三、写入数据测试
# echo "别瞎搞啊001" >> out.log
该数据流转过程:
out.log-->filebeat--->kafka-->logstash--->es ,通过kibana查询
POST /kx_es_index/_search
{
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
查看结果:可以看到写入ES
成功写入ES后,
查看kafka 队列:
SELECT "partition","offset","msg" FROM "caolihua01" WHERE "partition" IN (0,1,2,3) limit 10
该消息被消费掉了,所以kafka是查询不到的。