美图欣赏:
闲谈一刻
一名大二学生,Flink技术的热爱者。
学习过程中,一定要学会深入思考 (划重点)
一.窗口背景
Flink的窗口机制是其底层核心之一,也是高效流处理的关键。
A.窗口需求
在Streaming应用程序的情况下,数据是连续的,因此我们不能等待在开始处理之前流式传输整个数据。当然,我们可以处理每个传入的事件,然后转移到下一个事件,但在某些情况下,我们需要对传入的数据进行某种聚合 - 例如,有多少用户在过去10分钟内点击了您网页上的链接。在这种情况下,我们必须定义一个窗口并对窗口内的数据进行处理
Flink窗口主要分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类
这里重点分析下滑动窗口 Sliding Window
(1)滚动窗口Tumbling window
滚动窗口滚动数据流。这种类型的窗口是不重叠的 - 即,一个窗口中的事件/数据不会在其他窗口中重叠/出现。
(2)滑动窗口 Sliding Window
滑动窗口与翻滚窗口相对,滑过数据流。因此,滑动窗口可以重叠,它可以对输入的数据流进行更平滑的聚合 - 因为您不是从一组输入跳转到下一组输入,而是滑过输入的数据流。
(3)会话窗口 session Window
会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。
二.Flink滑动窗口原理
原由:今天在运行滑动窗口Demo时,发现设置的timeWindow俩时间参数,运行结果跟想象的不一样。
自己思考后,找到了原因,因此来分享下。
滑动窗口俩时间参数设置:
先看下Flink的timeWindow源码:
- 第一个参数表示:滑动大小
- 第二个参数表示:滑动步长
Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,Slide等于窗口的Size时,相邻窗口不重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
窗口时间参数图解:
自己代码参数设置
结论:
- 滑动窗口直接从当前窗口开始算
- 结果运行的显示,取决于第二个Slide参数时间设置
这里设置的是20秒,从开始运行到第20秒后,结果才能显示控制台。
举个例子:因为在第二个参数设置20秒内,如果17秒的时候突然有数据过来了,在过三秒后达到了20秒啊,就会立即计算执行(亲测)。因为Flink是流式计算。
会出现俩个相同计算结果。第一个计算结果出现后,在过20秒出现与第一个结果相同。
.timeWindow(Time.seconds(40), Time.seconds(20)
代码实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author : Jackson
* Version : 2020/4/21 & 1.0
*/
public class DataStreamWordCount {
public static void main(String[] args) throws Exception {
//创建Flink的流式计算环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//监听本地9000端口
DataStreamSource<String> text = env.socketTextStream("192.168.1.125", 9000, "\n");
//将接收的数据进行拆分,分组,窗口计算并且聚合输出
SingleOutputStreamOperator<WordWithCount> word = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String line, Collector<WordWithCount> out) throws Exception {
for (String word : line.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(40), Time.seconds(20))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count + b.count);
}
});
//打印结果
word.printToErr();
env.execute("Socket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + ";" + count;
}
}
}
运行结果:
————保持饥饿,保持学习
Jackson_MVP