flink Iterate迭代基本概念

基本概念:在流中创建“反馈(feedback)”循环,通过将一个算子的输出重定向到某个先前的算子。这对于定义不断更新模型的算法特别有用。

迭代的数据流向:DataStream → IterativeStream → DataStream

以下代码以流开始并连续应用迭代体。大于0的元素将被发送回反馈(feedback)通道,继续迭代,其余元素将向下游转发,离开迭代。

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
//这里设置feedback这个数据流是被反馈的通道,只要是value>0的数据都会被重新迭代计算。 iteration.closeWith(feedback); DataStream
<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });
 

猜你喜欢

转载自www.cnblogs.com/asker009/p/11122719.html