入门级程序
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
//获取 flink 的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String host="localhost";
int port=9000;
String delimiter="\n";
//
DataStreamSource<String> text = env.socketTextStream(host, port, delimiter);
SingleOutputStreamOperator<WordCount> wcs = text.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] vs = value.split("\\s");
for (String word : vs) {
out.collect(new WordCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))
.sum("count");
wcs.print().setParallelism(1);
//下面这行很重要,不然程序不会执行
env.execute(" word count...");
}
@Data
@NoArgsConstructor
public static class WordCount{
String word;
long count;
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
}
}
1.在终端输入 nc -l 9000
2.启动程序
3.在终端输入单词
如何将程序打包,放到 flink 运行呢?
1.打包
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.7</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifestEntries>
<Main-Class>frame.flink.learn.SocketWindowWordCountJava</Main-Class>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
2.启动 flink 程序
下载 flink 程序包。flink-1.6.1-bin-hadoop27-scala_2.11.tgz,然后解压
进入到 bin 目录 运行下面的命令:
./start-cluster.sh
./flink run ~/Desktop/hadoop-stream-frame-0.0.1-SNAPSHOT.jar --port 9000
3.查看运行结果
在终端输入文字以后,运行下面的命令 查看 日志:
tail -500f …/log/flink-yuzhihao-taskexecutor-0-xxx.out
也可以在 Flink web 页面查看任务运行状态:
http://localhost:8081/#/running-jobs
也可以上传 jar 包 到 flink
查看运行结果