案例一:故障转移
agent1 : flume.conf(node2)FailoverSinkProcessor
#定义 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 #定义sink组 a1.sinkgroups = g1 #source a1.sources.r1.type = TAILDIR # 用于断点续传,文件中包含各个路径下各个文件当前读取到的偏移量 a1.sources.r1.positionFile = /opt/flume_conf/tmp/tail_dir.json # 文件组 如两目录 f1 f2 a1.sources.r1.filegroups = f1 # f1 a1.sources.r1.filegroups.f1 = /opt/data/.*.log # 将数据流复制给所有 channel , 可以不用设置,默认是复制模式 a1.sources.r1.selector.type = replicating #channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #sink # sink 端的 avro 是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = node3 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = node3 a1.sinks.k2.port = 4142 #sink的故障转移机制,agent2优先级高于agent3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 10 a1.sinkgroups.g1.processor.priority.k2 = 5 a1.sinkgroups.g1.processor.maxpenalty = 10000 #关联关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1 a1.sinkgroups.g1.sinks = k1 k2
agent2 : flume.conf(node3)
#定义 a2.sources = r1 a2.sinks = k1 a2.channels = c1 #source a2.sources.r1.type = avro a2.sources.r1.bind = node3 a2.sources.r1.port = 4141 #channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 #sink #控制台日志打印 a2.sinks.k1.type = logger #关联关系 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
agent3 : flume.conf(node3)
#定义 a3.sources = r1 a3.sinks = k1 a3.channels = c1 #source a3.sources.r1.type = avro a3.sources.r1.bind = node3 a3.sources.r1.port = 4142 # channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 #sink #存储到hdfs a3.sinks.k1.type = hdfs a3.sinks.k1.hdfs.path=hdfs://mycluster/log_data/nginx/%Y%m%d/%H #上传文件的前缀 a3.sinks.k1.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k1.hdfs.rollCount = 0 #访问hdfs超时时间,5分钟 a1.sinks.k1.hdfs.callTimeout=300000 # 关联关系 a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动
依次执行 node3节点 flume-ng agent -n a3 -c conf -f /opt/flume_conf/flume3.conf flume-ng agent -n a2 -c conf -f /opt/flume_conf/flume2.conf node2节点 flume-ng agent -n a1 -c conf -f /opt/flume_conf/flume1.conf
测试:
1、启动后查看 agent2 的logger控制台日志被打印出来,同时HDFS中没有新文件创建
2、kill 掉 agent2 ,此时查看HDFS中已经有了对应的日志存储文件
案例二:负载均衡LoadBalancingSinkProcessor
将案例一中的agent1配置中的sinkgroups的配置改为如下即可
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random