Flume数据处理流程
Flume的事件是一个具有负载和字符串属性集的数据流单元。Flume Agent是一个JVM进程,负责接收外部源数据进行处理后沉给(sink)另一个目标组建(hbase hdfs hive。。。)
下图是详细的处理流程
-
接收外部数据源给source处理(相当于服务器端)
- netcat
- spooldir
- exec
- sequence
-
将数据交给通道处理器进行处理
-
通道处理器将数据提交给拦截器链进行各种业务拦截,处理完成后返回给通道处理器
-
将每一个事件提交通道选择器进行选择
-
返回一个已经被写好的通道集合(list集合)
-
为每个需要的通道写下所有事件。每个通道只打开一个事物,并且通道的所有事件都是该事物的一部分
-
备份
下图为sink的处理流程
不同的 Sink
- hdfs sink
/soft/flume/conf/hdfs_k.conf
启动Agent:flume-ng agent -f hdfs_k.conf -n a1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/centosmin0/flume/%y-%m-%d/%H/%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
#是否是产生新目录,每十分钟产生一个新目录,一般控制的目录方面。
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#是否产生新文件。
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=10
a1.sinks.k1.hdfs.rollCount=3
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- hbase sink
/soft/flume/conf/hbase_k.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hbase
a1.sinks.k1.table = ns1:t9
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
使用avroSource和AvroSink实现跃点agent处理
- 创建配置文件
/soft/flume/conf/avro_hop.conf
#a1
a1.sources = r1
a1.sinks= k1
a1.channels = c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=localhost
a1.sinks.k1.port=9999
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#a2
a2.sources = r2
a2.sinks= k2
a2.channels = c2
a2.sources.r2.type=avro
a2.sources.r2.bind=localhost
a2.sources.r2.port=9999
a2.sinks.k2.type = logger
a2.channels.c2.type=memory
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 启动a2(相当于服务端)
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a2 -Dflume.root.logger=INFO,console
- 启动a1(相当于客户端)
$>flume-ng agent -f /soft/flume/conf/avro_hop.conf -n a1
- 连接:nc localhost 9999