工作场景中会经常遇到对一个流按照不同维度做拆分,那么该如何做拆分呢?
使用 Filter 分流
filter算子分流,略…
Split + Select 分流
先在 split 算子里面定义 OutputSelector 的匿名内部构造类,然后重写 select 方法,根据数据的类型将不同的数据放到不同的 tag 里面,这样返回后的数据格式是 SplitStream,然后要使用这些数据的时候,可以通过 select 去选择对应的数据类型。
但是Split无法做连续分流,而且已经被标记为废弃了。
Side Output 分流
首先需要做的是定义一个 OutputTag 来标识 Side Output,代表这个 Tag 是要收集哪种类型的数据,如果是要收集多种不一样类型的数据,那么你就需要定义多种 OutputTag
OutputTag<PileExceptionKafkaMessage> exceptionTag = new OutputTag<PileExceptionKafkaMessage>("exception") {};
OutputTag<PileKafkaMessage> normalTag = new OutputTag<PileKafkaMessage>("normal") {};
处理数据,这里就是对数据准确性做校验,把有效的数据输入到正常流中,解析错误和校验不通过的数据输入到 异常流中。
SingleOutputStreamOperator<String> outputStreamOperator = dataStream.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String s, Context context, Collector<String> collector) {
try {
PileKafkaMessage pileKafkaMessage = JSON.parseObject(s, PileKafkaMessage.class);
pileKafkaMessage.setProcessStatus("1");
// 校验必填字段是否为空,校验失败则进异常数据流
ValidatorUtils.validateEntity(pileKafkaMessage);
context.output(normalTag, pileKafkaMessage);
} catch (Exception e) {
log.error("桩日志解析异常", e.getMessage());
PileExceptionKafkaMessage message = new PileExceptionKafkaMessage();
message.setContent(s);
message.setException(e.getMessage());
context.output(exceptionTag, message);
}
}
});
然后使用 getSideOutput 方法来获取不同 OutputTag 的数据
DataStream<PileExceptionKafkaMessage> exceptionStream = outputStreamOperator.getSideOutput(exceptionTag);
DataStream<PileKafkaMessage> normalStream = outputStreamOperator.getSideOutput(normalTag);
这样就能把原数据流切分成两个流了