在批处理代码里,读取文件的时候,读取的每一行就是一个时间,并不是整个文件是一个stream
map操作可以改变我们DataSet里的具体类型
groupby之后得到的是一个GroupedDataSet
sum之后得到一个AggregateDataSet
下面是流处理的代码,要用linux环境nc -lk 7777创建端口
package com.atguigu.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 从程序运行参数中读取hostname和port
val params: ParameterTool = ParameterTool.fromArgs(args)
val hostname: String = params.get("host")
val port: Int = params.getInt("port")
// 读取数据,现在就不是从文件里开始读取的(虽然也可以流处理从文件读取)kafka就是流式输入源
// 接收socket文本流,nc(net cat)命令nc -lk 7777启动一个端口,然后向一个指定的端口发送socket文本流
val inputDataStream: DataStream[String] = env.socketTextStream("10.10.64.83:0.0", 7777)
// 这里的hostname是老师视频里的windows里启动的linux子系统的命令窗口,所以写成localhost,如果是虚拟机上,就写虚拟机上的hostname
// 定义转换操作 word count
val resultDataStream: DataStream[(String, Int)] = inputDataStream
.flatMap(_.split(" ")) // 以空格分词,打散得到所有的word
.filter(_.nonEmpty)
.map( (_,1) ) // 转换成(word,count)数组
.keyBy(0) // 按照第一个元素分组 keyBy针对当前的key的Hashcode进行一个重分区,不是shuffle
.sum(1) // 按照第二个元素求和
// setParallelism设置并行度,不同的函数可以有不同的并行度
// 打印输出
resultDataStream.print()
// 流处理不能直接输出,因为要等待外部输入流式数据才能一步一步得到数据进行处理
env.execute("stream word count job")
// execute启动我们流处理的环境,当真正有输入的时候执行
// 输出会有不同的顺序是因为多线程的原因
}
}
运行命令可以再Run的Edit Configuration里配置程序参数
在疯狂尝试用虚拟机的Ubuntu的nc命令的方式开启一个端口跑代码之后,失败了。。。原因推测是连不上虚拟机的端口号,我也不知道为啥啊啊啊啊啊
然后尝试在windows下安装一个netcat插件,打开cmd,在netcat目录下调用nc命令,nc -lk 7777,顺利运行~同时还让师兄演示了一下和kafka结合的flink处理这个代码,哈哈收获很多,视频教程里老师是在Windows下装了一个Ubuntu的子系统,所以老师也可以直接起程序,至于有用虚拟机开起来的同学可以和我分享一下!!!因为我还没有解决这个问题:)