flink滑动窗口
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object windowtest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("node02", 5515)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
})
val inputDataStream = dataStream.map(data => (data.id, data.temperature))
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(1))
.reduce((data1, data2) => (data1._1, data1._2.min(data2._2)))
.map((_, "outetmp"))
inputDataStream.print()
stream.print()
env.execute()
}
}