Flume 1.8 写kafka到不同主题和不同分区 (随机)配置

logcollect.sources.taildir-source.interceptors =interceptor
logcollect.sources.taildir-source.interceptors.interceptor.type = regex_extractor

logcollect.sources.taildir-source.interceptors.interceptor.regex = .(UserBehaviorLog|UserOperLog|PlatFormSysLog).
logcollect.sources.taildir-source.interceptors.interceptor.serializers = s1
logcollect.sources.taildir-source.interceptors.interceptor.serializers.s1.name = key

selector

logcollect.sources.taildir-source.selector.type = multiplexing
logcollect.sources.taildir-source.selector.header = key

logcollect.sources.taildir-source.selector.mapping.UserOperLog = userOperLogChannel

logcollect.sources.taildir-source.selector.mapping.UserBehaviorLog= userBehaviorChannel
logcollect.sources.taildir-source.selector.mapping.PlatFormSysLog= platFormSysLogChannel

logcollect.sinks.userOperLogSink.type = org.apache.flume.sink.kafka.KafkaSink

kafka topic

logcollect.sinks.userOperLogSink.kafka.topic = user_oper_log
logcollect.sinks.userOperLogSink.kafka.bootstrap.servers = 10.168.79.166:9092,10.168.30.114:9092,10.168.92.222:9092
logcollect.sinks.userOperLogSink.flumeBatchSize = 20
logcollect.sinks.userOperLogSink.kafka.producer.acks = 1
logcollect.sinks.userOperLogSink.kafka.producer.client.id = user_operlog_agent
logcollect.sinks.userOperLogSink.kafka.producer.partitioner.class=com.logcollect.flume.SimplePartitioner
#logcollect.sinks.userOperLogSink.kafka.send.buffer.bytes=999999

logcollect.sinks.userOperLogSink.channel = userOperLogChannel

猜你喜欢

转载自blog.csdn.net/weixin_43654136/article/details/88552347