KeyedProcessFunction

8.3侧输出流(SideOutput)

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object processTest {
    
    
  def main(args: Array[String]): Unit = {
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //设点时间类型 、还需要在datastream上指定时间戳和watermark
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //自动设定200毫秒
    //env.getConfig.setAutoWatermarkInterval(200L)
    val stream = env.socketTextStream("node02", 5515)
    val dataStream = stream.map(data => {
    
    
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    }
    )
      //      .assignAscendingTimestamps(_.timestamp * 1000) //有序的数据
      //            处理乱序数据
      //            自动生成
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
    
     //3*1000为延迟
      override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
    })

    val Dstream = dataStream
      .keyBy(_.id)
      .process(new tempWarning())
    Dstream.print()
    env.execute()

  }
}

class tempWarning() extends KeyedProcessFunction[String, SensorReading, String] {
    
    
  //定义状态保存上次的温度
  lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  //定义状态保存这侧定时器的时间
  lazy val regiesTime = getRuntimeContext.getState(new ValueStateDescriptor[Long]("lasTime", classOf[Long]))

  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]) = {
    
    
    val preTemp = lastTemp.value()
    lastTemp.update(value.temperature)
    val lastTime = regiesTime.value()

    if (value.temperature > preTemp && lastTime == 0) {
    
    
      val timenow = ctx.timerService().currentProcessingTime() + 1000
      //设置定时器
      ctx.timerService().registerProcessingTimeTimer(timenow)
      regiesTime.update(timenow)
    }
    else if (preTemp < value.temperature || preTemp == 0.0) {
    
    
      //删除定时器并清空状态
      ctx.timerService().deleteProcessingTimeTimer(lastTime)
      regiesTime.clear()
    }

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]) = {
    
    
    out.collect(ctx.getCurrentKey + "温度连续上升")

  }

}

猜你喜欢

转载自blog.csdn.net/weixin_44429965/article/details/108014094