Flume学习(二)Flume的Source类型

1、概述

官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources

2、Flume Sources 描述

2.1 Avro Source

2.1.1 介绍

监听Avro端口,从Avro client streams接收events。当与另一个(前一跳)Flume agent内置的Avro Sink配对时,它可以创建分层收集拓扑。监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。字体加粗的属性必须进行设置。


!channels  –  

!type  –   类型名称,"AVRO"

!bind  –   需要监听的主机名或IP

!port  –   要监听的端口

threads    –   工作线程最大线程数

selector.type     

selector.*     

interceptors  –   空格分隔的拦截器列表

interceptors.*        

compression-type  none   压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配

sslfalse  是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”。

keystore   –   为SSL提供的java密钥文件所在路径。

keystore-password–   为SSL提供的java密钥文件 密码。

keystore-typeJKS密钥库类型可以是“JKS”或“PKCS12”。

exclude-protocolsSSLv3  空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。

ipFilter   false  如果需要为netty开启ip过滤,将此项设置为true

ipFilterRules–   定义netty的ip过滤设置表达式规则


2.1.2、示例

示例一:参考官方文档

实例二:

#通过 avro source 读取指定端口的输入数据  到控制台显示。  
a1.sources=s1  
a1.channels=c1  
a1.sinks=k1  
  
a1.sources.s1.type=avro  
a1.sources.s1.bind=192.168.123.102  
a1.sources.s1.port=55555  
  
a1.channels.c1.type=memory  
a1.sinks.k1.type=logger  
  
  
a1.sources.s1.channels=c1  
a1.sinks.k1.channel=c1 

把 case_avro.properties 配置文件上传到linux系统上的 flumetest 文件夹:

用这个命令来启动Flume:(如果配置了环境变量,就把 bin/ 去掉)

bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_avro.properties --name a1 -Dflume.root.logger=INFO,console  
--conf 指定flume配置文件的位置  
--conf-file 指定日志收集的配置文件  
--name 指定agent的名称  
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台  

启动后的部分结果:

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.  
18/05/06 11:46:25 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting  
18/05/06 11:46:25 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_avro.properties  
18/05/06 11:46:25 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1  
18/05/06 11:46:25 INFO conf.FlumeConfiguration: Processing:k1  
18/05/06 11:46:25 INFO conf.FlumeConfiguration: Processing:k1  
18/05/06 11:46:25 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]  
18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Creating channels  
18/05/06 11:46:25 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory  
18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Created channel c1  
18/05/06 11:46:25 INFO source.DefaultSourceFactory: Creating instance of source s1, type avro  
18/05/06 11:46:25 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger  
18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1]  
18/05/06 11:46:25 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.102, port: 55555 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@47ebf1f0 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }  
18/05/06 11:46:25 INFO node.Application: Starting Channel c1  
18/05/06 11:46:26 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.  
18/05/06 11:46:26 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started  
18/05/06 11:46:26 INFO node.Application: Starting Sink k1  
18/05/06 11:46:26 INFO node.Application: Starting Source s1  
18/05/06 11:46:26 INFO source.AvroSource: Starting Avro source s1: { bindAddress: 192.168.123.102, port: 55555 }...  
18/05/06 11:46:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.  
18/05/06 11:46:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started  
18/05/06 11:46:27 INFO source.AvroSource: Avro source s1 started. 

打开一开hadoop02的另一个窗口,在 flume 里面新建一个 tt.log 文件。

[hadoop@hadoop02 flume]$ vi tt.log  
hello  
world  

使用命令进行发送:(模拟avro发送数据)

[hadoop@hadoop02 flume]$ flume-ng avro-client -c ./conf -H 192.168.123.102 -p 55555 -F tt.log  

 发送后部分的日志:

SLF4J: Class path contains multiple SLF4J bindings.  
SLF4J: Found binding in [jar:file:/home/hadoop/apps/apache-flume-1.8.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]  
SLF4J: Found binding in [jar:file:/home/hadoop/apps/hadoop-2.7.5/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]  
SLF4J: Found binding in [jar:file:/home/hadoop/apps/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]  
SLF4J: Found binding in [jar:file:/home/hadoop/apps/apache-hive-2.3.2-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]  
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.  
18/05/06 11:51:54 WARN api.NettyAvroRpcClient: Using default maxIOWorkers

在控制台上的显示结果:

18/05/06 11:51:56 INFO ipc.NettyServer: Connection to /192.168.123.102:59682 disconnected.  
18/05/06 11:51:58 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }  
18/05/06 11:51:58 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64                                  world }  

2.2 Thrift Source

2.2.1 介绍

ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type = thrift,比较简单,不做赘述。


2.3.2、示例

#配置文件
#Name the components on this agent  
a1.sources= s1  
a1.sinks= k1  
a1.channels= c1  
   
#配置sources
a1.sources.s1.type = exec  
a1.sources.s1.command = tail -F /home/hadoop/logs/test.log  
a1.sources.s1.channels = c1  
   
#配置sinks 
a1.sinks.k1.type= logger  
a1.sinks.k1.channel= c1  
   
#配置channel
a1.channels.c1.type= memory

启动命令:(如果配置了环境变量,就把 bin/ 去掉)

bin/flume-ng agent --conf conf --conf-file ~/apps/flume/examples/case_exec.properties --name a1 -Dflume.root.logger=DEBUG,console

继续往日志里添加数据:


接收到的信息:


2.4、JMS Source

2.4.1、介绍

从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过


2.4.2、官网示例


2.5 Spooling Directory Source

2.5.1 介绍

Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。

下面是官网给出的source的配置,加粗的参数是必选。可选项太多,这边就介绍一个 fileSuffix,即文件读取后添加的后缀名,这个是可以更改。


2.5.2、示例

单一代理流配置

案例1:通过flume来监控一个目录,当目录中有新文件时,将文件内容输出到控制台。

#文件名:sample1.properties

#配置内容:

分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)

#监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台  
#配置一个agent agent的名称可以自定义  
#指定agent的 sources,sinks,channels  
#分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义  
a1.sources=s1  
a1.channels=c1  
a1.sinks=k1  
  
#配置 source 根据 agent的 sources 的名称来对 source 进行配置  
#source 的参数是根据 不同的数据源 配置不同---在文档查找即可  
#配置目录 source  flume这个文件夹用于存储需要读取的文件  
a1.sources.s1.type=spooldir  
a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume  
  
#配置 channel 根据 agent的 channels的名称来对 channels 进行配置  
#配置内存 channel  
a1.channels.c1.type=memory  
  
#配置 sink 根据 agent的sinks 的名称来对 sinks 进行配置  
#配置一个 logger sink  
a1.sinks.k1.type=logger  
  
#绑定 特别注意 source的channel 的绑定有 s,sink的 channel的绑定没有 s  
a1.sources.s1.channels=c1  
a1.sinks.k1.channel=c1

把 sample1.properties 配置文件上传到linux系统上的 flumetest 文件夹:

用这个命令来启动Flume:(如果配置了环境变量,就把 bin/ 去掉)

bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/sample1.properties --name a1 -Dflume.root.logger=INFO,console 
--conf 指定flume配置文件的位置  
--conf-file 指定日志收集的配置文件  
--name 指定agent的名称  
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台 

启动结果的部分日志:

18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Creating channels  
18/05/05 20:28:16 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory  
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Created channel c1  
18/05/05 20:28:16 INFO source.DefaultSourceFactory: Creating instance of source s1, type spooldir  
18/05/05 20:28:16 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger  
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1]  
18/05/05 20:28:16 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Spool Directory source s1: { spoolDir: /home/hadoop/apps/apache-flume-1.8.0-bin/flume } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@101f0f3a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }  
18/05/05 20:28:16 INFO node.Application: Starting Channel c1  
18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.  
18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started  
18/05/05 20:28:16 INFO node.Application: Starting Sink k1  
18/05/05 20:28:16 INFO node.Application: Starting Source s1  
18/05/05 20:28:16 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/hadoop/apps/apache-flume-1.8.0-bin/flume  
18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.  
18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started 

在liunx系统中新建一个文件 hello.txt

[hadoop@hadoop02 ~]$ vi hello.txt   
hello  
world 

把这个文件复制到 存储读取文件的目录下:(这个配置文件所设置的文件夹)

a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume  

使用命令:

[hadoop@hadoop02 ~]$ cp hello.txt ~/apps/apache-flume-1.8.0-bin/flume

自动读取结果:

18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.  
18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt to /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt.COMPLETED  
18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }  
18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64                                  world }  

2.6、其他

参考:https://blog.csdn.net/looklook5/article/details/40400885


猜你喜欢

转载自blog.csdn.net/qq_41851454/article/details/80219212