一、构建本地flink项目
java语言的场景
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
scala语言的场景
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.9.0
二、实现词频统计
写惯了java,本来是想要用java的,但是实在是资料不好找,不得不向现实屈服,选择了scala
批作业,实现读取文件的词频统计
val env = ExecutionEnvironment.getExecutionEnvironment
val dataset = env.readTextFile("/Users/jiayue/Downloads/select.txt");
dataset.flatMap{
_.toLowerCase.split(" ");}
.filter(_.nonEmpty)
.map{
(_,1)}
.groupBy(0)
.sum(1)
.print();
流作业,实现输入数据实时的词频统计
创建一个流
nc -lk 9999
实时计算流数据
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = env.socketTextStream("127.0.0.1",9999,'\n')
dataStream.flatMap{
line => line.toLowerCase.split(",")}
.filter(_.nonEmpty)
.map{
word => (word , 1)}
.keyBy(0)
.timeWindow(Time.seconds(3))
.sum(1)
.print()
// execute program
env.execute("Flink Streaming Scala API Skeleton")
执行效果
输入 “唧,唧,复,唧,唧,木,兰,当,户,织”
4> (唧,4)
2> (木,1)
3> (复,1)
4> (户,1)
2> (织,1)
3> (当,1)
4> (兰,1)
三、问题解决
报错信息:
Exception in thread main java.lang.NoClassDefFoundError:ExecutionEnvironment
将pom.xml文件中的 如下内容注释掉。
<scope>provided</scope>