周期性生成Watermark
import Source.WaterSensor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
case class WaterSensor(id:String,ts:Long,vc:Double)
object WindowDemo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[String] = env.socketTextStream("192.168.**.**",7777)
val dataStream: DataStream[WaterSensor] = stream
.map(x => {
val strings: Array[String] = x.split(",")
WaterSensor(strings(0), strings(1).toLong, strings(2).toDouble)})
.assignTimestampsAndWatermarks(new MyAssigner)
val minDataStream: DataStream[(String, Double)] = dataStream
.map(data=>(data.id,data.vc))
.keyBy(_._1)
.timeWindow(Time.seconds(10),Time.seconds(3))
.reduce((x,y)=>(x._1,x._2.min(y._2)))
dataStream.print("csh")
minDataStream.print("minData")
env.execute("windowDemo")
}
}
class MyAssigner extends AssignerWithPeriodicWatermarks[WaterSensor]{
var bound=3000
var maxTs= Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs-bound)
}
override def extractTimestamp(t: WaterSensor, l: Long): Long ={
maxTs=Math.max(t.ts*1000,maxTs)
t.ts*100
}
}
间断式生成Watermark
import Source.WaterSensor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
case class WaterSensor(id:String,ts:Long,vc:Double)
object WindowDemo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[String] = env.socketTextStream("192.168.**.**",7777)
val dataStream: DataStream[WaterSensor] = stream
.map(x => {
val strings: Array[String] = x.split(",")
WaterSensor(strings(0), strings(1).toLong, strings(2).toDouble)})
.assignTimestampsAndWatermarks(new MyAssigner)
val minDataStream: DataStream[(String, Double)] = dataStream
.map(data=>(data.id,data.vc))
.keyBy(_._1)
.timeWindow(Time.seconds(10),Time.seconds(3))
.reduce((x,y)=>(x._1,x._2.min(y._2)))
dataStream.print("csh")
minDataStream.print("minData")
env.execute("windowDemo")
}
}
class MyAssigner2 extends AssignerWithPunctuatedWatermarks[WaterSensor]{
var bound=3000
var maxTs= Long.MinValue
override def checkAndGetNextWatermark(t: WaterSensor, l: Long): Watermark = {
if(t.id.toInt%3==0){
new Watermark(maxTs-bound)
}
}
override def extractTimestamp(t: WaterSensor, l: Long): Long = {
maxTs=Math.max(t.ts*1000,maxTs)
t.ts*1000
}
}