使用 Filter 分流
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env); //从Kafka 获取到所有的数据流
SingleOutputStreamOperator<MetricEvent> machineData = data.filter(m -> "machine".equals(m.getTags().get("type"))); //过滤出机器的数据
SingleOutputStreamOperator<MetricEvent> dockerData = data.filter(m -> "docker".equals(m.getTags().get("type"))); //过滤出容器的数据
SingleOutputStreamOperator<MetricEvent> applicationData = data.filter(m -> "application".equals(m.getTags().get("type"))); //过滤出应用的数据
SingleOutputStreamOperator<MetricEvent> middlewareData = data.filter(m -> "middleware".equals(m.getTags().get("type"))); //过滤出中间件的数据
使用 Split 分流
只能分流一次
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env); //从Kafka 获取到所有的数据流
SplitStream<MetricEvent> splitData = data.split(new OutputSelector<MetricEvent> () {
//从
@Override
public Iterable<String> select(MetricEvent metricEvent) {
List<String> tags = new ArrayList<>();
String type = metricEvent.getTags().get("type");
switch (type) {
case "machine":
tags.add("machine"); break;
case "docker":
tags.add("docker"); break;
case "application":
tags.add("application"); break;
case "middleware":
tags.add("middleware"); break;
default:
break;
}
return tags;
}
});
DataStream<MetricEvent> machine = splitData.select("machine");
DataStream<MetricEvent> docker = splitData.select("docker");
DataStream<MetricEvent> application = splitData.select("application");
DataStream<MetricEvent> middleware = splitData.select("middleware");
使用 Side Output 分流
//创建 output tag
private static final OutputTag<MetricEvent> machineTag = new OutputTag<MetricEvent>("machine") { };
private static final OutputTag<MetricEvent> dockerTag = new OutputTag<MetricEvent>("docker") { };
private static final OutputTag<MetricEvent> applicationTag = new OutputTag<MetricEvent>("application") { };
private static final OutputTag<MetricEvent> middlewareTag = new OutputTag<MetricEvent>("middleware") { };
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env); //从Kafka 获取到所有的数据流
SingleOutputStreamOperator<MetricEvent> sideOutputData = data.process(new ProcessFunction<MetricEvent, MetricEvent>() {
@Override
public void processElement(MetricEvent metricEvent, Context context, Collector<MetricEvent> collector) throws Exception {
String type = metricEvent.getTags().get("type");
switch (type) {
case "machine":
context.output(machineTag, metricEvent);
case "docker":
context.output(dockerTag, metricEvent);
case "application":
context.output(applicationTag, metricEvent);
case "middleware":
context.output(middlewareTag, metricEvent);
default:
collector.collect(metricEvent);
}
}
});