logstash和kafka整合首先要注意软件版本之间的兼容性;
logstash 2.3.1
kafka 2.11-0.8.2.2
第一步:启动zookeeper
cd /home/wql/app/zookeeper-3.4.10
./bin/zkServer.sh start
第二步:启动kafka
cd /home/wql/app/kafka/bin
nohup ./kafka-server-start.sh /home/wql/app/kafka/config/server.properties &
启动消费者:
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic accesslog
第三步:启动logstash
定义:flow-kafka.conf
input {
file {
path => "/home/wql/app/nginx_logs/*"
discover_interval => 5
start_position => "beginning"
}
}
output {
kafka {
topic_id => "accesslog"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_servers => "localhost:9092"
}
}
启动程序:
cd /home/wql/app/elk2/logstash-2.3.1
./bin/logstash agent -f flow-kafka.conf