项目中我们需要对一个数据源读取到的数据,我们希望能够进到不同的channel,那么我们需要source的selectors和interceptors。
选择器默认使用的是replcation ,是复制模式,进入每个channel的数据都一样,这里我们选用 multiplexing
此项目拦截器我们使用了2个,一个ETL来过滤不合法数据,一个用来给数据添加头信息,头信息是一个k-v键值对,selectors根据不同的v来决定取哪一个channel
建议在拦截器里面不要做过复杂的操作
#logType start event
/**
从一个taildir数据源读
通过拦截器加上 logType 判断start event
来选择要去的channel
然后都sink到kafka 主题 topic_event topic_start
*/
# list the sources, sinks and channels in the agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
# set channels for source
a1.sources.r1.channels = c1 c2
# channel selector configuration
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = logType
a1.sources.r1.selector.mapping.start = c1
a1.sources.r1.selector.mapping.event = c2
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
a1.channels.c1.type=memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c2.type=memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
#kafka sinks
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = topic_start
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.topic = topic_event
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* @author wade
* @create 2019-03-27 10:56
*/
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//ETL 清洗,不合格的数据直接false 不要了;
//获取 event 的body
String body = new String(event.getBody(), Charset.forName("UTF-8"));
String[] strs = body.split("\\|");
if(strs.length!=2){
return null;
}
if(strs[0].trim().length() != 13 || !strs[1].trim().startsWith("{") || !strs[1].trim().endsWith("}")){
return null;
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> events = new ArrayList<>();
for (Event event : list) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
package com.atguigu.flume.interceptor;
import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wade
* @create 2019-03-27 13:35
*/
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charset.forName("UTF-8"));
String logType = "";
if (body.contains("start")) {
logType = "start";
}else {
logType = "event";
}
Map<String,String> headers = event.getHeaders();
headers.put("logType",logType);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> events = new ArrayList<>();
for (Event event : list) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}