- 读取文件进行批量计算WordCount
1.1 代码
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
/**
* 批处理wordcount
*/
object WordCount {
def main(args: Array[String]): Unit = {
// 创建一个执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 读取一个本地文件
val inputPath = "/Users/xietong/IdeaProjects/FlinkTutorial/src/main/resources/hello.txt"
val inputDataSet = env.readTextFile(inputPath)
val wordcount = inputDataSet.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)//.groupBy(_._1)
.sum(1)
.sortPartition(1, org.apache.flink.api.common.operators.Order.DESCENDING)
wordcount.print()
}
}
1.2 运行状态
2. 实时读取socket文本流计算WordCount
2.1 代码
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
//创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//接收一个socket文本流 本地起一个7777端口 nc -lk 7777 进行测试
val socketStream = env.socketTextStream("113.143.100.140", 7777)
//对每一条数据进行处理
val wordCount = socketStream.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
wordCount.print()
// wordCount.writeAsCsv("/Users/xietong/IdeaProjects/FlinkTutorial/src/main/resources/a")
env.execute()
}
}
2.2 nc命令启动一个7777端口进行测试
2.3 启动程序,运行状况(实时计算)