flink的transform算子
package api
import org.apache.flink.streaming.api.scala._
object trainsform {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val stream = env.readTextFile("F:\\flink_project\\src\\main\\resources\\words")
val input: DataStream[String] = stream.flatMap(_.split(" "))
val splitStream = input.split(
x => {
if (x.length > 5) Seq("long") else Seq("short")
}
)
val long = splitStream.select("long")
val short = splitStream.select("short")
val longing = long
val connected = longing.connect(short)
val coMapStreaaam = connected.map(
(_, "word is long"),
(_, "word is short")
)
val unionStream: DataStream[String] = long.union(short)
unionStream.print()
env.execute()
}
}