流处理HelloWold
第一步: 创建流处理执行环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
同样的需要导包,根据批处理的经验直接
import org.apache.flink.streaming.api.scala._
第二步: 接受Socket文本流
为了实现流式数据的发送需要nc工具。
首先代码中监控一个端口:
val inputDataStream:DataStream[String] = env.socketTextStream("localhost",7777)
这里写死只适用于测试阶段,不太灵活。
故从外部命令获取参数:
// 导入工具包
import org.apache.flink.api.java.utils.ParameterTool
val tool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = tool.get("host")
val port: Int = tool.getInt("port")
val inputDataStream:DataStream[String] =env.socketTextStream(host,port)
第三步: 进行数据解析处理
// 因为数据格式和批处理一样,所以处理方式也一样
val resultDataStream:DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))
.map((_,1))
.keyBy(0) //DataStream没有groupby方法,这里要使用keyBy方法。按下标0为key分组
//分配规则基于key的hashcode值
.sum(1)
第四步:输出结果
resultDataStream.print()
第五步:执行任务
如果没有此步骤,那么任务会直接退出。
就相当于值分配好任务,但是没有执行。
// 启动任务执行
env.execute("stream wordcount")
第六步:结果测试
首先:需要修改文件的Configuration。这样才可以读取外部参数
Program arguments 一栏输入 --host localhost --port 7777
之后需要启动nc工具。 这里因为使用Windows ,所以命令为
nv -lp 7777
启动nc工具并运行代码。
结果显示为:
3> (hello,1)
5> (world,1)
5> (fink,1)
3> (hello,2)
1> (scala,1)
3> (hello,3)
这里结果显示前编号是因为fink会对流式数据进行并行处理。并行度默认为电脑cpu核数。
同时也可以自行设置。
env.setParallelism(8)
在生产环境需要根据配置文件配置
也可以对某一个算子做并行度设置。但使用度不高,这样会损失效率。
val resultDataStream:DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))
.map((_,1)).setParallelism(3)
.keyBy(0)
.sum(1)
在做打印输出或者写入操作的时候,如果不需要有数字分区前缀。
可以把这个步骤的并行度设置为1
resultDataStream.print().setParallelism(1))
结果:
(hello,1)
当打印输出结果并行度为1 时,发现打印顺序和输入的顺序不一致。
这是因为打印步骤之前的并行度不为1,所以数据在传输方面不确定,
所以会产生乱序,若想保存顺序,需设置全局并行度为1:
env.setParallelism(1)
完整代码
package com.erke.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 第一步:创建执行环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 从外部命令获取参数
val tool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = tool.get("host")
val port: Int = tool.getInt("port")
val inputDataStream:DataStream[String] =env.socketTextStream(host,port)
// 进行转换处理统计
val resultDataStream:DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.sum(1)
// 打印输出
resultDataStream.print()
// 启动任务执行
env.execute("stream wordcount")
}
}