三种方案
- Kafka->logstash->elasticsearch->kibana(简单,只需启动一个代理程序)
- Kafka->kafka-connect-elasticsearch->elasticsearch->kibana(与confluent绑定紧,有些复杂)
- Kafka->elasticsearch-river-kafka-1.2.1-plugin->elasticsearch->kibana(代码很久没更新,后续支持比较差)
本文将采用方案二来完成这一伟大目标
参考
+ https://github.com/confluentinc/kafka-connect-elasticsearch
首先,是先要向kafka中写入日志,对于0.9+以上版本的kafka,我们可以采用如下配置方式。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.2.1</version>
</dependency>
对于早于该版本的kafka,可以直接使用kafka包中的KafkaLog4jAppender。其他日志方式类似。初始化logger的用法如下
static {
KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
kafkaAppender.setBrokerList(ConfigService.getAppConfig().
getProperty("log4j.appender.kafkalog.brokerList", ipListStr));
kafkaAppender.setTopic(logTopicName);
kafkaAppender.setCompressionType("gzip");
kafkaAppender.setSyncSend(false);
kafkaAppender.setLayout(new PatternLayout("%p@@log@@%d{yyyy-MM-dd@@HH:mm:ss}@@%m%n"));
kafkaAppender.activateOptions();
tripLogger.addAppender(kafkaAppender);
tripLogger.setLevel(Level.INFO);
}