flatMap(...RichFlatMapFunction)
val keyedSensorData = sensorData.keyBy(_.id) val alerts = keyedSensorData .flatMap(new TemperatureAlert(1.7)) alerts.print() ... class TemperatureAlert(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDesc) } override def flatMap(reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = { val lastTemp = lastTempState.value() val tempDiff = (lastTemp - reading.temperature).abs if (tempDiff > threshold) { out.collect(reading.id, reading.temperature, tempDiff) } this.lastTempState.update(reading.temperature) } }
flatMapWithState
val alerts = keyedSensorData.flatMapWithState[(String, Double, Double), Double] { case (in: SensorReading, None) => (List.empty, Some(in.temperature)) case (r: SensorReading, lastTemp: Some[Double]) => val tempDiff = (r.temperature - lastTemp.get).abs if (tempDiff > 1.7) { (List((r.id, r.temperature, tempDiff)), Some(r.temperature)) } else { (List.empty, Some(r.temperature)) } }
233