一、Time与Window
(一)Time
(二)Window
1、概述
2、window类型
(三)Window API
1、CountWindow
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
1) 滚动窗口
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
步骤:
1.获取执行环境
2.创建 SocketSource
3.对 stream 进行处理并按 key 聚合
4.countWindow 操作
5.执行聚合操作
6.将聚合数据输出
7.执行程序
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
object StreamCountWindow {
def main(args: Array[String]): Unit = {
//获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//创建SocketSource
val socketDataStream: DataStream[String] = env.socketTextStream("node01", 9999)
//对stream进行处理并按key聚合
val keyByStream: KeyedStream[(String, Int), Tuple] = socketDataStream.flatMap(x => x.split(" ")).map((_, 1)).keyBy(0)
//这里的5指的是5个相同key的元素计算一次
val streamWindow: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyByStream.countWindow(5)
//执行聚合操作
val reduceStream: DataStream[(String, Int)] = streamWindow.reduce((v1, v2) => (v1._1, v1._2 + v2._2))
//将聚合数据输出
reduceStream.print(this.getClass.getSimpleName)
//执行程序
env.execute("StreamCountWindow")
}
}
执行结果
2、TimeWindow
TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个 window 里面的所有数据进行计算。
1) 滚动窗口
Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。
步骤:
1.获取执行环境
2.创建你 socket 链接获取数据
3.进行数据转换处理并按 key 聚合
4.引入 timeWindow
5.执行聚合操作
6.输出打印数据
7.执行程序
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object StreamTimeWindow {
def main(args: Array[String]): Unit = {
//1.获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建socket链接获取数据
val socketSource: DataStream[String] = env.socketTextStream("node01", 9999)
//3.进行数据转换处理并按key聚合
val keyByStream: KeyedStream[(String, Int), Tuple] = socketSource.flatMap(x => x.split(" ")).map((_, 1)).keyBy(0)
//4.引入滚动窗口
val timeWindowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyByStream.timeWindow(Time.seconds(5))
//5.执行聚合操作
val reduceStream: DataStream[(String, Int)] = timeWindowStream.reduce((item1, item2) => (item1._1, item1._2 + item2._2))
//6.输出打印数据
reduceStream.print()
//7.执行程序
env.execute("StreamTimeWindow")
}
}
执行结果
3、WindowReduce
WindowedStream → DataStream:给 window 赋一个 reduce 功能的函数,并返回一个聚合的结果。
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object StreamReduceWindow {
def main(args: Array[String]): Unit = {
//获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//创建SocketSource
val stream: DataStream[String] = env.socketTextStream("node01", 9999)
//对stream进行处理并按key聚合
val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.flatMap(x => x.split(" ")).map(item => (item, 1)).keyBy(0)
//引入时间窗口
val streamWindow: WindowedStream[(String, Int), Tuple, TimeWindow] = streamKeyBy.timeWindow(Time.seconds(5))
//执行聚合操作
val streamReduce: DataStream[(String, Int)] = streamWindow.reduce((item1, item2) => (item1._1, item1._2 + item2._2))
//将聚合数据写入文件
streamReduce.print()
//执行程序
env.execute("StreamReduceWindow")
}
}
执行结果
4、WindowApple
apply 方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时
使用。
用法
1) 实现一个 WindowFunction 类
2) 指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy 中使用分组字段的类型, 窗
口类型]
示例
使用 apply 方法来实现单词统计
步骤
1) 获取流处理运行环境
2) 构建 socket 流数据源,并指定 IP 地址和端口号
3) 对接收到的数据转换成单词元组
4) 使用 keyBy 进行分流(分组)
5) 使用 timeWinodw 指定窗口的长度(每 3 秒计算一次)
6) 实现一个 WindowFunction 匿名内部类
a. apply 方法中实现聚合计算
b. 使用 Collector.collect 收集数据
7) 打印输出
8) 启动执行
9) 在 Linux 中,使用 nc -lk 端口号 监听端口,并发送单词
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.function.RichWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object StreamApplyWindow {
def main(args: Array[String]): Unit = {
//1.获取流处理运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.构建socket流数据源,并指定IP地址和端口号
val textDataStream: DataStream[String] = env.socketTextStream("node01", 9999)
//3.对接收到的数据转换成单词元组
val wordDataStream: DataStream[(String, Int)] = textDataStream.flatMap(_.split(" ")).map((_, 1))
//4.使用keyBy进行分流(分组)
val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
//5.使用timeWindow指定窗口的长度(每3秒计算一次)
val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
//6.实现一个WindowFunction匿名内部类
val reduceDataStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
//在apply方法中实现数据的聚合
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
val tuple: (String, Int) = input.reduce((t1, t2) => {
(t1._1, t1._2 + t2._2)
})
//将要返回的数据收集起来,发送回去
out.collect(tuple)
}
})
reduceDataStream.print()
env.execute("StreamApplyWindow")
}
}
执行结果
5、WindowFold
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object StreamFoldWindow {
def main(args: Array[String]): Unit = {
//1.获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建socketSource
val stream: DataStream[String] = env.socketTextStream("node01", 9999)
//3.对stream进行处理并按key聚合
val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0)
//4.引入滚动窗口
val streamWindow: WindowedStream[(String, Int), Tuple, TimeWindow] = streamKeyBy.timeWindow(Time.seconds(3))
//5.执行fold操作
val streamFold: DataStream[Int] = streamWindow.fold(100) {
(begin, item) => begin + item._2
}
//6.将聚合数据写入文件
streamFold.print()
//7.执行程序
env.execute("StreamFoldWindow")
}
}
执行结果
6、Aggregation on Window
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object StreamAggregationWindow {
def main(args: Array[String]): Unit = {
//1.获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建SocketSource
val socketStream: DataStream[String] = env.socketTextStream("node01", 9999)
//3.对stream进行处理并按key聚合
val keyByStream: KeyedStream[(String, String), Tuple] = socketStream.map(item => (item.split("")(0), item.split(" ")(1))).keyBy(0)
//4.引入滚动窗口key
val streamWindow: WindowedStream[(String, String), Tuple, TimeWindow] = keyByStream.timeWindow(Time.seconds(5))
//5.执行聚合操作
val streamMax: DataStream[(String, String)] = streamWindow.max(1)
//6.将聚合数据输出
streamMax.print()
//7.执行程序
env.clone()
}
}