8.3侧输出流(SideOutput)
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object processTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("node02", 5515)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
})
val Dstream = dataStream
.keyBy(_.id)
.process(new tempWarning())
Dstream.print()
env.execute()
}
}
class tempWarning() extends KeyedProcessFunction[String, SensorReading, String] {
lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
lazy val regiesTime = getRuntimeContext.getState(new ValueStateDescriptor[Long]("lasTime", classOf[Long]))
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]) = {
val preTemp = lastTemp.value()
lastTemp.update(value.temperature)
val lastTime = regiesTime.value()
if (value.temperature > preTemp && lastTime == 0) {
val timenow = ctx.timerService().currentProcessingTime() + 1000
ctx.timerService().registerProcessingTimeTimer(timenow)
regiesTime.update(timenow)
}
else if (preTemp < value.temperature || preTemp == 0.0) {
ctx.timerService().deleteProcessingTimeTimer(lastTime)
regiesTime.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]) = {
out.collect(ctx.getCurrentKey + "温度连续上升")
}
}