ELK系列-使用flume日志收集
我们的部分日志收集是使用flume在各个应用服务器上收集日志,然后到腾讯云的消息队列ckafka,在使用logstash消费ckafka的日志消息输入到elasticserch中。
flume工具介绍
- flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase、kafka等)的能力 。
- flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
- flume的可靠性 ,当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。
flume安装使用
- 服务器上传flume软件
apache-flume-1.9.0-bin.tar.gz - 解压flume
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/ - 修改用户权限
chown -R yunwei:yunwei ./apache-flume-1.9.0-bin - 创建元数据目录文件
touch /home/yunweizhrt/3wcm_fmm_visit.json - 修改内存配置
进去flume的bin目录
vi bin/flume-ng
JAVA_OPTS="-Xmx120m" - 修改配置文件
要求收集目录/home/yunweizhrt/tomcat-logs/3w-cn/dwz-web-jump/fmm_visit下面的日志到腾讯云ckafka中
配置脚本如下:
[yunweizhrt@VM_40_2_centos conf]$ vim 3wcm_fmm_visit
agent.sources = s1
agent.channels = c1
agent.sinks = r1
agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1
agent.sources.s1.type = TAILDIR
agent.sources.s1.positionFile = /home/yunweizhrt/3wcm_fmm_visit.json
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/home/yunweizhrt/tomcat-logs/3w-cn/dwz-web-jump/.*log
agent.sources.s1.fileHeader = true
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r1.brokerList = 10.38.40.7:9092
agent.sinks.r1.topic = flume_fmm_visit_3wcn
clog.sinks.sink_log1.flumeBatchSize = 2000
clog.sinks.sink_log1.kafka.producer.acks = 1
#下面是自己开发的插件可以去掉
agent.sources.s1.interceptors = i1
agent.sources.s1.interceptors.i1.type = com.zhrt.flume.interceptor.IpExtractInterceptor$CounterInterceptorBuilder
agent.sources.s1.interceptors.i1.regex = stat_.+
agent.sources.s1.interceptors.i1.value = hourly
agent.sources.s1.interceptors.i1.default = dail
[yunweizhrt@VM_40_2_centos conf]$
-
启动flume进程
nohup ./bin/flume-ng agent -n agent -c conf -f conf/3wcm_fmm_visit & -
验证是否启动成功
查询是否存在此进程
-
验证数据是否收集
使用logstash消费腾讯云ckafka
-
配置信息如下:
代码实现了添加一个字段types,根据types字段进行区分
[root@VM_40_24_centos conf.d]# pwd
/usr/local/logstash-7.0.1/config/conf.d
[root@VM_40_24_centos conf.d]# more logstash-jump-3wcn-fmm.conf
input{
kafka{
bootstrap_servers => "10.18.40.7:9092"
group_id => "flume_fmm_visit_3wcn"
topics => "flume_fmm_visit_3wcn"
consumer_threads => 1
decorate_events => true
auto_offset_reset => "latest"
type => "java_3wcn_fmm_visit"
}
}
filter {
if [type] == "java_3wcn_fmm_visit" {
mutate {
add_field => { "types" => "%{type}"}
}
json {
source => "message"
}
date {
match => ["visittime", "yyyy-MM-dd HH:mm:ss"]
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => "message"
remove_field => "content"
remove_field => "kafka"
remove_field => "tags"
remove_field => ["timestamp"]
}
}
}
output {
if [types] == "java_3wcn_fmm_visit" {
elasticsearch {
hosts => ["10.10.10.16:9200"]
index => "logstash_jump_3wcn_fmm_%{+YYYY_MM_dd}"
}
}
}
[root@VM_40_24_centos conf.d]#
-
启动logstash
nohup ./bin/logstash -f ./config/conf.d/ & -
验证收集情况