具体代码实现:
pom文件引入:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study.flink01</groupId>
<artifactId>Flink_flink01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--java依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
</project>
package xuwei.tech
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 需求:滑动窗口计算
*需要实现每隔1秒对最近2秒的数据进行汇总计算
*/
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
//获取socket端口号
val port:Int= try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e:Exception =>{
System.err.println("No port set.use default port 9999")
}
9999
}
//获取flink的运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//链接socket获取输入数字
val text= env.socketTextStream("flink102",port,'\n')
//注意:必须要添加这一行隐式转换,否者下面的flatMap方法执行会报错
import org.apache.flink.api.scala._
//解析数据(把数据打平),分组、窗口计算,并且聚合求sum
val windowCounts= text.flatMap(line =>line.split("\\s"))//打平,把每一行单词都切开
.map(w => WordWindowCount(w,1)) //把单词转成word,1种形式
.keyBy("word")
.timeWindow(Time.seconds(2),Time.seconds(1)) //指定窗口大小,指定间隔时间
.sum("count") //sum或reduce都可以
// .reduce((a,b)=>WordWindowCount(a.word,a.count+b.count))
windowCounts.print().setParallelism(1) //打印到控制台
//执行任务
env.execute("Socket window count")
}
case class WordWindowCount(word:String,count: Long)
}
控制台打印的效果跟上篇:第 1 节 滑动窗口单词计数(Java实现)
的运行效果一样