1、概述
官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources
2、Flume Sources 描述
2.1 Avro Source
2.1.1 介绍
监听Avro端口,从Avro client streams接收events。当与另一个(前一跳)Flume agent内置的Avro Sink配对时,它可以创建分层收集拓扑。监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。字体加粗的属性必须进行设置。
!channels – !type – 类型名称,"AVRO" !bind – 需要监听的主机名或IP !port – 要监听的端口 threads – 工作线程最大线程数 selector.type selector.* interceptors – 空格分隔的拦截器列表 interceptors.* compression-type none 压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配 sslfalse 是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”。 keystore – 为SSL提供的java密钥文件所在路径。 keystore-password– 为SSL提供的java密钥文件 密码。 keystore-typeJKS密钥库类型可以是“JKS”或“PKCS12”。 exclude-protocolsSSLv3 空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。 ipFilter false 如果需要为netty开启ip过滤,将此项设置为true ipFilterRules– 定义netty的ip过滤设置表达式规则
2.1.2、示例
示例一:参考官方文档
实例二:
#通过 avro source 读取指定端口的输入数据 到控制台显示。 a1.sources=s1 a1.channels=c1 a1.sinks=k1 a1.sources.s1.type=avro a1.sources.s1.bind=192.168.123.102 a1.sources.s1.port=55555 a1.channels.c1.type=memory a1.sinks.k1.type=logger a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1
把 case_avro.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:(如果配置了环境变量,就把 bin/ 去掉)
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_avro.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动后的部分结果:
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 18/05/06 11:46:25 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 18/05/06 11:46:25 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_avro.properties 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Creating channels 18/05/06 11:46:25 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/06 11:46:25 INFO source.DefaultSourceFactory: Creating instance of source s1, type avro 18/05/06 11:46:25 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/06 11:46:25 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.102, port: 55555 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@47ebf1f0 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/06 11:46:25 INFO node.Application: Starting Channel c1 18/05/06 11:46:26 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/06 11:46:26 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/06 11:46:26 INFO node.Application: Starting Sink k1 18/05/06 11:46:26 INFO node.Application: Starting Source s1 18/05/06 11:46:26 INFO source.AvroSource: Starting Avro source s1: { bindAddress: 192.168.123.102, port: 55555 }... 18/05/06 11:46:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 18/05/06 11:46:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started 18/05/06 11:46:27 INFO source.AvroSource: Avro source s1 started.
打开一开hadoop02的另一个窗口,在 flume 里面新建一个 tt.log 文件。
[hadoop@hadoop02 flume]$ vi tt.log hello world
使用命令进行发送:(模拟avro发送数据)
[hadoop@hadoop02 flume]$ flume-ng avro-client -c ./conf -H 192.168.123.102 -p 55555 -F tt.log
发送后部分的日志:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/apps/apache-flume-1.8.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/apps/hadoop-2.7.5/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/apps/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/apps/apache-hive-2.3.2-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 18/05/06 11:51:54 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
在控制台上的显示结果:
18/05/06 11:51:56 INFO ipc.NettyServer: Connection to /192.168.123.102:59682 disconnected. 18/05/06 11:51:58 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F hello } 18/05/06 11:51:58 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64 world }
2.2 Thrift Source
2.2.1 介绍
ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type = thrift,比较简单,不做赘述。
2.3.2、示例
#配置文件 #Name the components on this agent a1.sources= s1 a1.sinks= k1 a1.channels= c1 #配置sources a1.sources.s1.type = exec a1.sources.s1.command = tail -F /home/hadoop/logs/test.log a1.sources.s1.channels = c1 #配置sinks a1.sinks.k1.type= logger a1.sinks.k1.channel= c1 #配置channel a1.channels.c1.type= memory
启动命令:(如果配置了环境变量,就把 bin/ 去掉)
bin/flume-ng agent --conf conf --conf-file ~/apps/flume/examples/case_exec.properties --name a1 -Dflume.root.logger=DEBUG,console
继续往日志里添加数据:
接收到的信息:
2.4、JMS Source
2.4.1、介绍
从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
2.4.2、官网示例
2.5 Spooling Directory Source
2.5.1 介绍
Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。
下面是官网给出的source的配置,加粗的参数是必选。可选项太多,这边就介绍一个 fileSuffix,即文件读取后添加的后缀名,这个是可以更改。2.5.2、示例
单一代理流配置
案例1:通过flume来监控一个目录,当目录中有新文件时,将文件内容输出到控制台。
#文件名:sample1.properties
#配置内容:
分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)
#监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台 #配置一个agent agent的名称可以自定义 #指定agent的 sources,sinks,channels #分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义 a1.sources=s1 a1.channels=c1 a1.sinks=k1 #配置 source 根据 agent的 sources 的名称来对 source 进行配置 #source 的参数是根据 不同的数据源 配置不同---在文档查找即可 #配置目录 source flume这个文件夹用于存储需要读取的文件 a1.sources.s1.type=spooldir a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume #配置 channel 根据 agent的 channels的名称来对 channels 进行配置 #配置内存 channel a1.channels.c1.type=memory #配置 sink 根据 agent的sinks 的名称来对 sinks 进行配置 #配置一个 logger sink a1.sinks.k1.type=logger #绑定 特别注意 source的channel 的绑定有 s,sink的 channel的绑定没有 s a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1
把 sample1.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:(如果配置了环境变量,就把 bin/ 去掉)
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/sample1.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动结果的部分日志:
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Creating channels 18/05/05 20:28:16 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/05 20:28:16 INFO source.DefaultSourceFactory: Creating instance of source s1, type spooldir 18/05/05 20:28:16 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/05 20:28:16 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Spool Directory source s1: { spoolDir: /home/hadoop/apps/apache-flume-1.8.0-bin/flume } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@101f0f3a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/05 20:28:16 INFO node.Application: Starting Channel c1 18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/05 20:28:16 INFO node.Application: Starting Sink k1 18/05/05 20:28:16 INFO node.Application: Starting Source s1 18/05/05 20:28:16 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/hadoop/apps/apache-flume-1.8.0-bin/flume 18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
在liunx系统中新建一个文件 hello.txt
[hadoop@hadoop02 ~]$ vi hello.txt hello world
把这个文件复制到 存储读取文件的目录下:(这个配置文件所设置的文件夹)
a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume
使用命令:
[hadoop@hadoop02 ~]$ cp hello.txt ~/apps/apache-flume-1.8.0-bin/flume
自动读取结果:
18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt to /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt.COMPLETED 18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F hello } 18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64 world }
2.6、其他
参考:https://blog.csdn.net/looklook5/article/details/40400885