文章目录
1.Flink 流处理程序的一般流程
1)获取 Flink 流处理执行环境
2)构建 source
3)数据处理
4)构建 sink
-
示例
编写 Flink 程序,用来统计单词的数量。 -
步骤
1)获取 Flink 批处理运行环境
2)构建一个 socket 源
3)使用 flink 操作进行单词统计
4)打印
说明:如果 linux 上没有安装 nc 服务 ,使用 yum 安装
yum install -y nc
- 参考代码
package com.czxy.flink.stream
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
//入门案例,单词统计
object StreamWordCount {
def main(args: Array[String]): Unit = {
/**
* 实现思路:
* 1. 获取流处理运行环境
* 2. 构建socket流数据源, 并指定IP地址和端口号
* 3. 对接收到的数据转换成单词元组
* 4. 使用keyBy 进行分流(分组)
* 5. 使用timeWindow 指定窗口的长度(每5秒计算一次)
* 6. 使用sum执行累加
* 7. 打印输出
* 8. 启动执行
* 9. 在Linux中, 使用nc -lk 端口号监听端口, 并发送单词
*/
//1.创建流处理的执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.构建数据源 使用的socket
val socketDataStream: DataStream[String] = env.socketTextStream("node01",9999)
//3.数据的处理
import org.apache.flink.api.scala._
//4.对接收到的数据转换成单词元组,使用keyBy 进行分流(分组)
val groupKeyedStream: KeyedStream[(String, Int), Tuple] = socketDataStream.flatMap(x=>x.split(",")).map((_,1)).keyBy(0)
//5.使用timeWindow 指定窗口的长度(每5秒计算一次)
val windowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupKeyedStream.timeWindow(Time.seconds(5))
//6.使用sum执行累加
val resultDataStream: DataStream[(String, Int)] = windowedStream.sum(1)
//7.打印数据
resultDataStream.print()
//8.执行程序
env.execute("StreamWordCount")
}
}