[提前声明]
文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章
写作不易,转载请注明,谢谢!
代码案例地址: ?https://github.com/Mydreamandreality/sparkResearch
一般我们的数据都会由引擎处理完后写入logstash,再由logstash写入kafka生产者,spark程序消费kafka数据,进行处理,最终持久化,一会做几个简单的logstash案例,看下是如何处理的,
spark教程和kafka教程可以参考我之前的博客
安装
在Centos7中安装LogStash:
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
此处的版本应和Elasticsearch的版本对应- 解压即可
Elasticsearch的数据通过logstash导出为JSON文件
-
进入logstash的解压目录,/bin
-
安装插件:
./logstash-plugin install logstash-output-elasticsearch
-
安装成功后,新建 es-json.conf 文件
-
编辑该文件:
input{
elasticsearch{
host => ["127.0.0.1:9200"]
index => "Index"
type => "type"
size => 1000
scroll => "5m"
docinfo => false
}
}
output{
file{
path => "文件输出路径"
}
stdout{
codec=> json_lines # 这个是是否输出日志
}
}
- 执行: ./logstash -f es-json.conf
- 执行成功后查看output的文件即可
Sys_log数据写入logstash,logstash写入到kafka生产者
[此处不需要安装插件]
先创建topic
./kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic syslog
启动消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic syslog --from-beginning
在bin目录下编写logstash脚本
input {
syslog {
port => 514 # SysLog发送的端口
codec => cef
}
}
output{
kafka {
bootstrap_servers => '127.0.0.1:9092' # kafka的地址
topic_id => 'syslog' # 输出到kafka的主题
codec => plain
}
}
- 执行: ./logstash -f 创建的conf文件
- 查看消费者控制台输出即可
Syslog端口数据发送工具
扫描二维码关注公众号,回复:
9125418 查看本文章