流配置
单一代理流配置
案例1:通过flume来监控一个目录,当目录中有新文件时,将文件内容输出到控制台。
#文件名:sample1.properties
#配置内容:
分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)
#监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台 #配置一个agent agent的名称可以自定义 #指定agent的 sources,sinks,channels #分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义 a1.sources=s1 a1.channels=c1 a1.sinks=k1 #配置 source 根据 agent的 sources 的名称来对 source 进行配置 #source 的参数是根据 不同的数据源 配置不同---在文档查找即可 #配置目录 source flume这个文件夹用于存储需要读取的文件 a1.sources.s1.type=spooldir a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume #配置 channel 根据 agent的 channels的名称来对 channels 进行配置 #配置内存 channel a1.channels.c1.type=memory #配置 sink 根据 agent的sinks 的名称来对 sinks 进行配置 #配置一个 logger sink a1.sinks.k1.type=logger #绑定 特别注意 source的channel 的绑定有 s,sink的 channel的绑定没有 s a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1
把 sample1.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/sample1.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动部分结果:
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Creating channels 18/05/05 20:28:16 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/05 20:28:16 INFO source.DefaultSourceFactory: Creating instance of source s1, type spooldir 18/05/05 20:28:16 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/05 20:28:16 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Spool Directory source s1: { spoolDir: /home/hadoop/apps/apache-flume-1.8.0-bin/flume } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@101f0f3a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/05 20:28:16 INFO node.Application: Starting Channel c1 18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/05 20:28:16 INFO node.Application: Starting Sink k1 18/05/05 20:28:16 INFO node.Application: Starting Source s1 18/05/05 20:28:16 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/hadoop/apps/apache-flume-1.8.0-bin/flume 18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
在liunx系统中新建一个文件 hello.txt
[hadoop@hadoop02 ~]$ vi hello.txt hello world
把这个文件复制到 存储读取文件的目录下:(这个配置文件所设置的文件夹)
a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume
使用命令:
[hadoop@hadoop02 ~]$ cp hello.txt ~/apps/apache-flume-1.8.0-bin/flume
读取结果:
18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one. 18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt to /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt.COMPLETED 18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F hello } 18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64 world }
案例2:tcp
#文件名:case_tcp.properties
#配置内容:(是在同一个节点上进行操作)
上传到hadoop02节点:
分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)
#通过 avro source 读取指定端口的输入数据 到控制台显示。 a1.sources=s1 a1.channels=c1 a1.sinks=k1 a1.sources.s1.type=netcat a1.sources.s1.bind=192.168.123.102 a1.sources.s1.port=55555 a1.channels.c1.type=memory a1.sinks.k1.type=logger a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1
把 case_tcp.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_tcp.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动后的部分结果:
op/apps/apache-flume-1.8.0-bin/flumetest/case_tcp.properties 18/05/06 10:41:34 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 18/05/06 10:41:34 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 10:41:34 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 10:41:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 18/05/06 10:41:34 INFO node.AbstractConfigurationProvider: Creating channels 18/05/06 10:41:34 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/06 10:41:34 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/06 10:41:34 INFO source.DefaultSourceFactory: Creating instance of source s1, type netcat 18/05/06 10:41:34 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 18/05/06 10:41:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/06 10:41:34 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@738ed94d counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/06 10:41:34 INFO node.Application: Starting Channel c1 18/05/06 10:41:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/06 10:41:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/06 10:41:34 INFO node.Application: Starting Sink k1 18/05/06 10:41:34 INFO node.Application: Starting Source s1 18/05/06 10:41:34 INFO source.NetcatSource: Source starting 18/05/06 10:41:34 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.123.102:55555]
打开一个相同节点的另一个窗口:
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ telnet 192.168.123.102 55555 -bash: telnet: command not found
输入上面的命令发现找不到这个 telnet 这个组件,需要从yum 上下载:(需要切换至root用户下)
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ su Password: [root@hadoop02 apache-flume-1.8.0-bin]# yum install telnet Loaded plugins: fastestmirror, refresh-packagekit, security Setting up Install Process Determining fastest mirrors epel/metalink | 6.2 kB 00:00 * base: mirrors.sohu.com * epel: mirrors.tongji.edu.cn * extras: mirror.bit.edu.cn * updates: mirror.bit.edu.cn base | 3.7 kB 00:00 epel | 4.7 kB 00:00 epel/primary_db | 6.0 MB 00:12 extras | 3.4 kB 00:00 updates | 3.4 kB 00:00 updates/primary_db | 7.0 MB 00:03 Resolving Dependencies --> Running transaction check ---> Package telnet.x86_64 1:0.17-48.el6 will be installed --> Finished Dependency Resolution Dependencies Resolved =============================================================================================================== Package Arch Version Repository Size =============================================================================================================== Installing: telnet x86_64 1:0.17-48.el6 base 58 k Transaction Summary =============================================================================================================== Install 1 Package(s) Total download size: 58 k Installed size: 109 k Is this ok [y/N]: y Downloading Packages: telnet-0.17-48.el6.x86_64.rpm | 58 kB 00:00 Running rpm_check_debug Running Transaction Test Transaction Test Succeeded Running Transaction Installing : 1:telnet-0.17-48.el6.x86_64 1/1 Verifying : 1:telnet-0.17-48.el6.x86_64 1/1 Installed: telnet.x86_64 1:0.17-48.el6 Complete! [root@hadoop02 apache-flume-1.8.0-bin]#
下载完成之后运行这个命令:
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ telnet 192.168.123.102 55555 Trying 192.168.123.102... Connected to 192.168.123.102. Escape character is '^]'.
在下方输入hello:
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ telnet 192.168.123.102 55555 Trying 192.168.123.102... Connected to 192.168.123.102. Escape character is '^]'. hello OK
在另一个节点上会展示在控制台上:
18/05/06 10:42:28 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
案例3:avro
#文件名:case_avro.properties
#配置内容:(是在同一个节点上进行操作)
上传到hadoop02节点:
分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)
#通过 avro source 读取指定端口的输入数据 到控制台显示。 a1.sources=s1 a1.channels=c1 a1.sinks=k1 a1.sources.s1.type=avro a1.sources.s1.bind=192.168.123.102 a1.sources.s1.port=55555 a1.channels.c1.type=memory a1.sinks.k1.type=logger a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1
把 case_avro.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_avro.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动后的部分结果:
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 18/05/06 11:46:25 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 18/05/06 11:46:25 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_avro.properties 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 11:46:25 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Creating channels 18/05/06 11:46:25 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/06 11:46:25 INFO source.DefaultSourceFactory: Creating instance of source s1, type avro 18/05/06 11:46:25 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 18/05/06 11:46:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/06 11:46:25 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.102, port: 55555 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@47ebf1f0 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/06 11:46:25 INFO node.Application: Starting Channel c1 18/05/06 11:46:26 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/06 11:46:26 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/06 11:46:26 INFO node.Application: Starting Sink k1 18/05/06 11:46:26 INFO node.Application: Starting Source s1 18/05/06 11:46:26 INFO source.AvroSource: Starting Avro source s1: { bindAddress: 192.168.123.102, port: 55555 }... 18/05/06 11:46:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 18/05/06 11:46:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started 18/05/06 11:46:27 INFO source.AvroSource: Avro source s1 started.
打开一开hadoop02的另一个窗口,在 flume 里面新建一个 tt.log 文件。
[hadoop@hadoop02 flume]$ vi tt.log hello world
使用命令进行发送:(模拟avro发送数据)
[hadoop@hadoop02 flume]$ flume-ng avro-client -c ./conf -H 192.168.123.102 -p 55555 -F tt.log
发送后部分的过:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/hadoop/apps/apache-flume-1.8.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/apps/hadoop-2.7.5/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/apps/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/hadoop/apps/apache-hive-2.3.2-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 18/05/06 11:51:54 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
在控制台上的显示结果:
18/05/06 11:51:56 INFO ipc.NettyServer: Connection to /192.168.123.102:59682 disconnected. 18/05/06 11:51:58 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F hello } 18/05/06 11:51:58 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64 world }
案例4:实时模拟从web服务器中读取数据到hdfs中
实验:(以这个模拟web界面的数据,需要一直启动着)
新建一个空文件:
[hadoop@hadoop02 tomcat]$ touch catalina.out [hadoop@hadoop02 tomcat]$ ll total 0 -rw-rw-r--. 1 hadoop hadoop 0 May 6 12:19 catalina.out
写一个脚本,依次往这个文件里面读入数据:
[hadoop@hadoop02 tomcat]$ while true; do echo `date` >> catalina.out; sleep 1; done
用这个命令进行查看:(数据在不断增加)
[hadoop@hadoop02 tomcat]$ tail -F catalina.out Sun May 6 12:24:57 CST 2018 Sun May 6 12:24:58 CST 2018 Sun May 6 12:24:59 CST 2018 Sun May 6 12:25:00 CST 2018 Sun May 6 12:25:01 CST 2018
#文件名:case_hdfs.properties
#配置内容:(是在同一个节点上进行操作)
读取的是tomcat/catalina.out 里面的数据(这个数据一直在不断的更新,每次读取的都是最后一次的数据)
#配置一个agent agent的名称可以自定义 #指定agent的sources,sinks,channels #分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #配置source 根据agent的sources的名称来对source进行配置 #source的参数是根据 不同的数据源 配置不同---在文档查找即可 #配置source a1.sources.s1.type = exec a1.sources.s1.command = tail -F /home/hadoop/tomcat/catalina.out #配置channel 根据agent的channels的名称来对channels进行配置 #配置channel a1.channels.c1.type = memory #配置sink 根据agent的sinks的名称来对sinks进行配置 #配置一个hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M #设置目录的回滚 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = true #设置前缀和后缀 a1.sinks.k1.hdfs.filePrefix = taobao a1.sinks.k1.hdfs.fileSuffix = log #设置文件的回滚 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 1024 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.hdfs.fileType = DataStream #为source 指定它的channel a1.sources.s1.channels = c1 #为sink 指定他的 channel a1.sinks.k1.channel = c1
运行命令:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_hdfs.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
运行的部分结果:
18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1 18/05/06 16:09:44 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 18/05/06 16:09:44 INFO node.AbstractConfigurationProvider: Creating channels 18/05/06 16:09:44 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/06 16:09:44 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/06 16:09:44 INFO source.DefaultSourceFactory: Creating instance of source s1, type exec 18/05/06 16:09:44 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hdfs 18/05/06 16:09:44 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/06 16:09:44 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@c992421 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/06 16:09:44 INFO node.Application: Starting Channel c1 18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/06 16:09:44 INFO node.Application: Starting Sink k1 18/05/06 16:09:44 INFO node.Application: Starting Source s1 18/05/06 16:09:44 INFO source.ExecSource: Exec source starting with command: tail -F /home/hadoop/tomcat/catalina.out 18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started 18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
在HDFS节点上查看结果:
读取文件的多少由配置的参数决定的。
多代理流程:
#配置内容:(现在配置的在不同的节点上)(如果要配置在同一个节点上 agent 的别名必须不一样,一个是a1,一个是a2)
在不同的节点上,端口号可以相同(因为一个是写入,一个是写出)。
在同一个节点上,端口号一定不一样。
#文件名:case_source.properties
#配置一个agent agent的名称可以自定义 #指定agent的sources,sinks,channels #分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #配置source 根据agent的sources的名称来对source进行配置 #source的参数是根据 不同的数据源 配置不同---在文档查找即可 #配置source a1.sources.s1.type = netcat a1.sources.s1.bind = 192.168.123.102 a1.sources.s1.port = 44455 #配置channel 根据agent的channels的名称来对channels进行配置 #配置channel a1.channels.c1.type = memory #配置sink 根据agent的sinks的名称来对sinks进行配置 #配置一个hdfs sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.123.103 a1.sinks.k1.port = 44466 #为source 指定它的channel a1.sources.s1.channels = c1 #为sink 指定他的 channel a1.sinks.k1.channel = c1
#文件名:case_sink.properties
#配置一个agent agent的名称可以自定义 #指定agent的sources,sinks,channels #分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #配置source 根据agent的sources的名称来对source进行配置 #source的参数是根据 不同的数据源 配置不同---在文档查找即可 #配置source a1.sources.s1.type = avro a1.sources.s1.bind = 192.168.123.103 a1.sources.s1.port = 44466 #配置channel 根据agent的channels的名称来对channels进行配置 #配置channel a1.channels.c1.type = memory #配置sink 根据agent的sinks的名称来对sinks进行配置 #配置一个hdfs sink a1.sinks.k1.type = logger #为source 指定它的channel a1.sources.s1.channels = c1 #为sink 指定他的 channel a1.sinks.k1.channel = c1
必须先启动hadoop03(192.168.123.103)这个节点:
启动命令:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_sink.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动的部分结果:
2018-05-06 17:06:59,944 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1 2018-05-06 17:07:00,133 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2018-05-06 17:07:00,134 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2018-05-06 17:07:00,141 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1 2018-05-06 17:07:00,142 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1 2018-05-06 17:07:00,149 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source s1: { bindAddress: 192.168.123.103, port: 44466 }... 2018-05-06 17:07:01,029 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 2018-05-06 17:07:01,054 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: s1 started 2018-05-06 17:07:01,064 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source s1 started. 2018-05-06 17:07:32,758 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x5fadd41f, /192.168.123.102:54177 => /192.168.123.103:44466] OPEN 2018-05-06 17:07:32,761 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x5fadd41f, /192.168.123.102:54177 => /192.168.123.103:44466] BOUND: /192.168.123.103:44466 2018-05-06 17:07:32,761 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x5fadd41f, /192.168.123.102:54177 => /192.168.123.103:44466] CONNECTED: /192.168.123.102:54177
再启动hadoop02(192.168.123.102)这个节点:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_source.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动的部分结果:
18/05/06 17:07:32 INFO node.AbstractConfigurationProvider: Creating channels 18/05/06 17:07:32 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 18/05/06 17:07:32 INFO node.AbstractConfigurationProvider: Created channel c1 18/05/06 17:07:32 INFO source.DefaultSourceFactory: Creating instance of source s1, type netcat 18/05/06 17:07:32 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: avro 18/05/06 17:07:32 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next hop 18/05/06 17:07:32 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1] 18/05/06 17:07:32 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@67749305 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 18/05/06 17:07:32 INFO node.Application: Starting Channel c1 18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 18/05/06 17:07:32 INFO node.Application: Starting Sink k1 18/05/06 17:07:32 INFO node.Application: Starting Source s1 18/05/06 17:07:32 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: 192.168.123.103, port: 44466 }... 18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 18/05/06 17:07:32 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: 192.168.123.103, port: 44466 18/05/06 17:07:32 INFO sink.AvroSink: Attempting to create Avro Rpc client. 18/05/06 17:07:32 INFO source.NetcatSource: Source starting 18/05/06 17:07:32 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.123.102:44455] 18/05/06 17:07:32 WARN api.NettyAvroRpcClient: Using default maxIOWorkers 18/05/06 17:07:33 INFO sink.AbstractRpcSink: Rpc sink k1 started.
进行测试:
在hadoop02(192.168.123.102)上写数据:(记住,是当前的IP地址)
[hadoop@hadoop02 tomcat]$ telnet 192.168.123.102 44455 Trying 192.168.123.102... Connected to 192.168.123.102. Escape character is '^]'. hello OK world OK
会在hadoop03(192.168.123.103)控制台上显示:
2018-05-06 17:08:58,204 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. } 2018-05-06 17:09:28,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 72 6C 64 0D world. }
多路复用流:
Flume支持扇出流从一个源到多个通道。有两种模式的扇出,复制和复用。在复制流的事件被发送到所有的配置通道。在复用的情况下,事件被发送到合格的渠 道只有一个子集。扇出流,需要指定源和扇出通道的规则。这是通过添加一个通道“选择”,可以复制或复用。再进一步指定选择的规则,如果它是一个多路。如果你 不指定一个选择,则默认情况下它复制。
#配置内容:(现在配置的在不同的节点上)(如果要配置在同一个节点上 agent 的别名必须不一样,一个是a1,一个是a2)
在不同的节点上,端口号可以相同(因为一个是写入,一个是写出)。
在同一个节点上,端口号一定不一样。
#文件名:case_replicate_sink.properties
#2个channel和2个sink的配置文件 # Name the components on this agent a1.sources = s1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.s1.type = netcat a1.sources.s1.port = 44455 a1.sources.s1.bind = 192.168.123.102 a1.sources.s1.selector.type = replicating a1.sources.s1.channels = c1 c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 192.168.123.103 a1.sinks.k1.port = 44466 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 192.168.123.104 a1.sinks.k2.port = 44466 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
启动命令:(不做顺序要求)
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_replicate_sink.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
部分结果日志:
2018-05-06 19:18:00,047 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started 2018-05-06 19:18:00,047 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:205)] Rpc sink k2: Building RpcClient with hostname: 192.168.123.104, port: 44466 2018-05-06 19:18:00,047 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client. 2018-05-06 19:18:00,047 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1 2018-05-06 19:18:00,079 (lifecycleSupervisor-1-7) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting 2018-05-06 19:18:00,084 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:287)] Starting RpcSink k1 { host: 192.168.123.103, port: 44466 }... 2018-05-06 19:18:00,084 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 2018-05-06 19:18:00,084 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started 2018-05-06 19:18:00,085 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:205)] Rpc sink k1: Building RpcClient with hostname: 192.168.123.103, port: 44466 2018-05-06 19:18:00,085 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client. 2018-05-06 19:18:00,124 (lifecycleSupervisor-1-1) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers 2018-05-06 19:18:00,127 (lifecycleSupervisor-1-5) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers 2018-05-06 19:18:00,127 (lifecycleSupervisor-1-7) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.123.102:44455] 2018-05-06 19:18:00,681 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:301)] Rpc sink k2 started. 2018-05-06 19:18:00,682 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:301)] Rpc sink k1 started.
#文件名:case_replicate_k1.properties(配置在hadoop03上,也就是 192.168.123.103)
# Name the components on this agent a1.sources = s1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.s1.type = avro a1.sources.s1.channels = c1 a1.sources.s1.bind = 192.168.123.103 a1.sources.s1.port = 44466 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
启动命令:(不做顺序要求)
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_replicate_k1.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
部分结果日志:
(AbstractConfigurationProvider.java:201)] Created channel c1 2018-05-06 19:08:09,357 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source s1, type avro 2018-05-06 19:08:09,390 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2018-05-06 19:08:09,396 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [s1, k1] 2018-05-06 19:08:09,405 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.103, port: 44466 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3a803148 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 2018-05-06 19:08:09,421 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1 2018-05-06 19:08:09,546 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2018-05-06 19:08:09,554 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2018-05-06 19:08:09,561 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1 2018-05-06 19:08:09,563 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1 2018-05-06 19:08:09,565 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source s1: { bindAddress: 192.168.123.103, port: 44466 }... 2018-05-06 19:08:10,244 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean. 2018-05-06 19:08:10,255 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: s1 started 2018-05-06 19:08:10,258 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source s1 started.
#文件名:case_replicate_k2.properties(配置在hadoop04上,也就是 192.168.123.104)
# Name the components on this agent a3.sources = s1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.s1.type = avro a3.sources.s1.channels = c1 a3.sources.s1.bind = 192.168.123.104 a3.sources.s1.port = 44466 # Describe the sink a3.sinks.k1.type = logger a3.sinks.k1.channel = c1 # Use a channel which buffers events in memory a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100
启动命令:(不做顺序要求)注意:如果agent是a3,则命令中必须用a3
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_replicate_k2.properties --name a3 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置 --conf-file 指定日志收集的配置文件 --name 指定agent的名称 -Dflume.root.logger=INFO,console 让收集的信息打印到控制台
部分结果日志:
roperty(FlumeConfiguration.java:1016)] Processing:k1 2018-05-06 19:07:32,260 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1 2018-05-06 19:07:32,304 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a3] 2018-05-06 19:07:32,304 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels 2018-05-06 19:07:32,333 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory 2018-05-06 19:07:32,352 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1 2018-05-06 19:07:32,353 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source s1, type avro 2018-05-06 19:07:32,393 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger 2018-05-06 19:07:32,400 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [s1, k1] 2018-05-06 19:07:32,409 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.104, port: 44466 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3a803148 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 2018-05-06 19:07:32,426 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1 2018-05-06 19:07:32,594 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2018-05-06 19:07:32,597 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2018-05-06 19:07:32,603 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1 2018-05-06 19:07:32,604 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1