flum静态拦截器

现在要求:
把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上
然后统一收集到 hdfs 中。
但是在 hdfs 中要求的目录为:
/source/logs/access/20190101/**
/source/logs/nginx/20190101/**
/source/logs/web/20190101/**
需求原理解析

① 在服务器 A 和服务器 B 上 创建配置文件 exec_source_avro_sink.conf 
 
# Name the components on this agent
 a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 
 
# Describe/configure the source 
a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/data/access.log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static ##  static 拦截器的功能就是往采集到的数据的 header 中插入自 ##  己定义的 key-value 对 a1.sources.r1.interceptors.i1.key = type a1.sources.r1.interceptors.i1.value = access 
 
a1.sources.r2.type = exec 
a1.sources.r2.command = tail -F /root/data/nginx.log
 a1.sources.r2.interceptors = i2 
 a1.sources.r2.interceptors.i2.type = static
  a1.sources.r2.interceptors.i2.key = type 
a1.sources.r2.interceptors.i2.value = nginx 
 
a1.sources.r3.type = exec 
a1.sources.r3.command = tail -F /root/data/web.log 
a1.sources.r3.interceptors = i3 
a1.sources.r3.interceptors.i3.type = static
 a1.sources.r3.interceptors.i3.key = type 
a1.sources.r3.interceptors.i3.value = web 
 
# Describe the sink
 a1.sinks.k1.type = avro 
 a1.sinks.k1.hostname = 192.168.200.101 
 a1.sinks.k1.port = 41414 
 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 20000 
a1.channels.c1.transactionCapacity = 10000 
 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sources.r2.channels = c1
 a1.sources.r3.channels = c1 
 a1.sinks.k1.channel = c1
② 在服务器 C 上创建配置文件 avro_source_hdfs_sink.conf  文件内容为 
 
#定义 agent 名, source、channel、sink 的名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 
 
#定义 source 
a1.sources.r1.type = avro
 a1.sources.r1.bind = mini2
  a1.sources.r1.port =41414 
 #添加时间拦截器 
 a1.sources.r1.interceptors = i1 
 a1.sources.r1.interceptors.i1.type =  org.apache.flume.interceptor.TimestampInterceptor$Builder 
 
#定义 channels 
a1.channels.c1.type = memory
 a1.channels.c1.capacity = 20000 
 a1.channels.c1.transactionCapacity = 10000 
 
#定义 sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path=hdfs://192.168.200.101:9000/source/logs/%{ty pe}/%Y%m%d a1.sinks.k1.hdfs.filePrefix =events 
a1.sinks.k1.hdfs.fileType = DataStream
 a1.sinks.k1.hdfs.writeFormat = Text #时间类型
  a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件不按条数生成 
  a1.sinks.k1.hdfs.rollCount = 0 #生成的文件按时间生成 
  a1.sinks.k1.hdfs.rollInterval = 30 #生成的文件按大小生成 
  a1.sinks.k1.hdfs.rollSize  = 10485760 #批量写入 hdfs 的个数
   a1.sinks.k1.hdfs.batchSize = 10000 flume 操作 hdfs 的线程数(包括新建,写入等)
    a1.sinks.k1.hdfs.threadsPoolSize=10 #操作 hdfs 超时时间
     a1.sinks.k1.hdfs.callTimeout=30000 
 
#组装 source、channel、sink 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

③ 配置完成之后,在服务器 A 和 B 上的/root/data 有数据文件 access.log、
nginx.log、web.log。先启动服务器 C 上的 flume,启动命令 在 flume 安装目录下执行 :

bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1  -Dflume.root.logger=DEBUG,console 

然后在启动服务器上的 A 和 B,启动命令 在 flume 安装目录下执行 :

bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 Dflume.root.logger=DEBUG,console 


注意事项:上面有对应的文件 在设置种,要是自己没有创建文件夹会报错,所以需要自己创建对应的文件夹,而文件时自己会创建的,所以我们只需要创建文件夹就可以了~

开启第一台克隆三个会话页面分别设置死循环:
while true; do echo “access,access…” >> /root/logs1/access.log;sleep 0.5;done
while true; do echo “nginx,nginx…” >> /root/logs1/nginx.log;sleep 0.5;done
while true; do echo “web,web…” >> /root/logs1/web.log;sleep 0.5;done

在可视化web页面便可以看到三个分类文件了
http://node-1:50070 (如果没有配置host文件将node-1换成你自己ip)

猜你喜欢

转载自blog.csdn.net/weixin_44654375/article/details/87895186