flink也可以处理非实时的离线数据
- 离线环境 ExecutionEnvironment.getExecutionEnvironment
- 实时环境 StreamExecutionEnvironment.getExecutionEnvironment
1 离线案例Scala代码
/**
* @Auther: 多易教育-行哥
* @Date: 2020/6/14
* @Description: flink处理离线数据
*/
object ScalaBatchWordCount {
def main(args: Array[String]): Unit = {
// 导入scala的API
import org.apache.flink.api.scala._
// 获取批次离线处理的数据
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 读取数据
val lines: DataSet[String] = env.readTextFile("data/wc/a.txt")
// 处理数据切割组装 分组 聚合
val res: AggregateDataSet[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)).groupBy(0).sum(1)
// 将结果以文件的格式写入到文件中
res.writeAsText("data/wc/res").setParallelism(1)
// 执行任务
env.execute("flink-scala-batch-word_count")
}
}
由于处理的是离线数据 ,程序处理完成以后就会推出