真实分析Flink滑动窗口原理

美图欣赏:

闲谈一刻

一名大二学生,Flink技术的热爱者。 

学习过程中,一定要学会深入思考 (划重点)


一.窗口背景

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。

A.窗口需求

在Streaming应用程序的情况下,数据是连续的,因此我们不能等待在开始处理之前流式传输整个数据。当然,我们可以处理每个传入的事件,然后转移到下一个事件,但在某些情况下,我们需要对传入的数据进行某种聚合 - 例如,有多少用户在过去10分钟内点击了您网页上的链接。在这种情况下,我们必须定义一个窗口并对窗口内的数据进行处理

Flink窗口主要分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类

这里重点分析下滑动窗口 Sliding Window

(1)滚动窗口Tumbling window

滚动窗口滚动数据流。这种类型的窗口是不重叠的 - 即,一个窗口中的事件/数据不会在其他窗口中重叠/出现。

(2)滑动窗口 Sliding Window

滑动窗口与翻滚窗口相对,滑过数据流。因此,滑动窗口可以重叠,它可以对输入的数据流进行更平滑的聚合 - 因为您不是从一组输入跳转到下一组输入,而是滑过输入的数据流。

(3)会话窗口 session Window

会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。

二.Flink滑动窗口原理

原由:今天在运行滑动窗口Demo时,发现设置的timeWindow俩时间参数,运行结果跟想象的不一样。

自己思考后,找到了原因,因此来分享下。

滑动窗口俩时间参数设置:

先看下Flink的timeWindow源码

  • 第一个参数表示:滑动大小
  • 第二个参数表示:滑动步长

Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,Slide等于窗口的Size时,相邻窗口不重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			return window(SlidingEventTimeWindows.of(size, slide));
		}
	}

窗口时间参数图解:

自己代码参数设置

结论

  • 滑动窗口直接从当前窗口开始算
  • 结果运行的显示,取决于第二个Slide参数时间设置

这里设置的是20秒,从开始运行到第20秒后,结果才能显示控制台。

举个例子:因为在第二个参数设置20秒内,如果17秒的时候突然有数据过来了,在过三秒后达到了20秒啊,就会立即计算执行(亲测)。因为Flink是流式计算。

会出现俩个相同计算结果。第一个计算结果出现后,在过20秒出现与第一个结果相同。

 .timeWindow(Time.seconds(40), Time.seconds(20)

代码实现

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


/**
 * Author : Jackson
 * Version : 2020/4/21 & 1.0
 */


public class DataStreamWordCount {
    public static void main(String[] args) throws Exception {

        //创建Flink的流式计算环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //监听本地9000端口
        DataStreamSource<String> text = env.socketTextStream("192.168.1.125", 9000, "\n");

        //将接收的数据进行拆分,分组,窗口计算并且聚合输出
        SingleOutputStreamOperator<WordWithCount> word = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String line, Collector<WordWithCount> out) throws Exception {
                for (String word : line.split("\\s")) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        })
                .keyBy("word")
                .timeWindow(Time.seconds(40), Time.seconds(20))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        //打印结果
        word.printToErr();
        env.execute("Socket Window WordCount");

    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + ";" + count;
        }
    }

}

运行结果:

                                                         ————保持饥饿,保持学习

                                                                              Jackson_MVP

猜你喜欢

转载自blog.csdn.net/Jackson_mvp/article/details/105663023