一、Flume介绍
1.1 概述
- fcloudera开源提供的一个开源的日志采集工具;
- 可以从各个地方采集我们的数据
- socket网络数据包,
- 文件夹,
- 某一个指定的文件里面,
- kafka消息队列里面采集。
- 可以将采集来的数据,发送到其他地方,比如日之外文件,hdfs数据存储,kafka消息队列。
一些概念:
Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
Flow: Event从源点到达目的点的迁移的抽象。
Agent: 一个独立的Flume进程,一个agent就是一个JVM,包含组件Source、 Channel、 Sink。
1.2 核心三组件
source:连接数据源,从数据源获取数据
channel:管道的作用,连接source与sink 主要起到数据的缓冲以以及连接的作用
sink:数据下沉的目的地,采集的数据要发送到哪里去都是sink说了算
这三个组件运行起来叫做一个flume的实例 叫做agen
1.2.1 Source
Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http。
常用Source:
Netcat Source 监控一个网络的套接字端口(ip和端口),来获取数据
Spool Source监听一个指定的目录,即只要应用程序想这个指定的目录中添加新的文件,
EXEC Source 监听一条shell命令的执行结果:tail -F hello.txt
Avro Source 监听上一级Agent的数据流
1.2.2 Channel
用于Source收集数据的缓存,Channel存放数据的单位是Event
常用的Channel:
Memary Channel: 数据存放在一个内存的缓存队列中
1.2.3 Sink
用于将Channel中的event数据传送到目的地
常用的Sink:
Logger Sink :是将数据发送到控制台
HDFS Sink :将数据发送到HDFS
Avro Sink :实现多级Agent之间的数据连接
一些参数设置
二、Flume 安装
上传解压配置Javahome
tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz -C /export/servers/
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
export JAVA_HOME=/export/servers/jdk1.8.0_141
三、Flume采集实例
3.1 从文件夹采集数据到HDFS集群
Spool Directory Source ——> Memory Channel ——>HDFS Sink
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
mkdir -p /export/servers/dirfile
vim spooldir.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/dirfile
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/
#写入hdfs的文件名前缀
a1.sinks.k1.hdfs.filePrefix = events-
#是否启用时间上的”舍弃”
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#sink间隔多长将临时文件滚动成最终目标文件,单位:秒,默认是30秒
a1.sinks.k1.hdfs.rollInterval = 3
#当临时文件达到该大小(单位:bytes)时,滚动成目标文件
a1.sinks.k1.hdfs.rollSize = 20
#默认值:10,当events数据达到该数量时候,将临时文件滚动成目标文件
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Channel参数解释:
-
capacity:默认该通道中最大的可以存储的event数量
-
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
-
keep-alive:event添加到通道中或者移出的允许时间
启动
bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console
3.2 高可用Flum-NG配置案例failover
名称 |
HOST |
角色 |
Agent1 |
node01 |
Web Server |
Collector1 |
node02 |
AgentMstr1 |
Collector2 |
node03 |
AgentMstr2 |
将node03机器上面的flume安装包以及文件生产的两个目录拷贝到node01机器上面去
node03
cd /export/servers
scp -r apache-flume-1.6.0-cdh5.14.0-bin/ node01:$PWD
scp -r shells/ taillogs/ node01:$PWD
node01
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim agent.conf
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#
##set gruop
agent1.sinkgroups = g1
#
##set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
#
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/servers/taillogs/access_log
#
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
#
## set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020
#
## set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020
#
##set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#
##set failover
agent1.sinkgroups.g1.processor.type = failover
# 谁的数值大就用谁的
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
# 惩罚时间,如果k1发生故障,10秒不接收数据
agent1.sinkgroups.g1.processor.maxpenalty = 10000
node02与node03配置flumecollection
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim collector.conf
node02
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = node02
a1.sources.r1.channels = c1
#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/failover/
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
#
node03
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = node03
a1.sources.r1.channels = c1
#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/failover/
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
启动顺序
谁接收谁先启动
node03 -- > node02 --> node01
node03和node02
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin
bin/flume-ng agent -n a1 -c conf -f conf/collector.conf -Dflume.root.logger=DEBUG,console
node01
cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin
bin/flume-ng agent -n agent1 -c conf -f conf/agent.conf -Dflume.root.logger=DEBUG,console
node01机器启动文件产生脚本
cd /export/servers/shells
sh tail-file.sh
四、扩展
flume实现收集mysql的数据
-
两个jar包:
-
flume-ng-sql-source-1.3.7.jar
-
MySql的JDBC驱动包放在Flume库目录;
-
- 创建相关的目录文件;
-
配置Flume。