0,目标
通过flume采集日志写入HDFS,采集通道如下图所示,由两个flume组成一个从日志文件到HDFS的通道:
上面的第二个flume是多余的,下图所示的通道和上图所示效果一样且更简洁,上图是为了学习,搞的更复杂一点。
PS: FileChannel原理:
一,安装
1,下载
http://archive.apache.org/dist/flume/
2,解压即安装
3,删除一个jar包,这个包和hadoop中的包冲突
rm /module/flume/lib/guava-11.0.2.jar
删除之后要配置Hadoop_Home的环境变量
4,配置文件
将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212
二,项目经验之Flume组件选型
- 1)Source
Taildir Source相比Exec Source、Spooling Directory Source的优势 :
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。不会丢数据,但是有可能会导致数据重复。
Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
Spooling Directory Source监控目录,支持断点续传。
batchSize设置最佳实际:Event 1K左右时,500-1000合适(默认为100)
- 2)Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。
注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中。
三,编写配置文件读取日志
vim file-flume-kafka.conf
#为各组件命名
a1.sources = r1
a1.channels = c1
#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/applog/log/app.*
a1.sources.r1.positionFile = /export/soft/apache-flume-1.9.0-bin/taildir_position.json
#a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = com.flume.interceptor.ETLInterceptor$Builder
#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
四,启动flume
bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
这样启动,shell工具关闭后,进程会被杀死,采用nohup启动:
nohup bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
五,脚本启/停flume
#! /bin/bash
case $1 in
"start"){
for i in node1 node2
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
done
};;
"stop"){
for i in node1 node2
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
-Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1
表示把日志存入/opt/module/flume/log1.txt
2>&1,2表示错误日志,2>&1代表的意思是错误日志也像前面的flume日志一样存放到log1.txt中
"ps -ef | grep file-flume-kafka | grep -v grep |awk ‘{print $2}’ | xargs -n1 kill -9 "
注意:1,flume的进程名是Application,但其他进程也可能是这个名字,所以使用进程名不准确,最好使用配置文件名称file-flume-kafka
2, ‘{print $2}’,要加转义符,原因是不加的话表示取命令行传进来的参数,加了之后才表示取切割后的第二个值,直接在命令行执行时不需要,在脚本里面就需要这样
3,xargs -n1表示取前面命令的结果,然后传参给kill -9
项目经验
1,零点漂移(阿里大数据之路)
2,小文件、压缩
3,不完整的日志过滤
4,按小时、分、秒滚动生成文件
如果数量特别大,短时间内会生成大量数据,则可以按小时滚动生成日志文件,可以使用hdfs.round/hdfs.roundValue/hdfs.roundUnit