Flink-pom项目的搭建以及简单的WordCount程序(Java)
搭建pom
强烈建议使用官方的推荐写法,用命令行输入以下代码(也不用你敲,只用改一下你的flink版本号即可,我用的是1.9.1)
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.1
然后根据提示,输入groupId,artId,package等等信息即可。
用官方的这个案例去创建pom有什么好处?
- pom都帮你写好了,还有日志的配置
- 还自带demo的,一个离线demo,一个实时demo
写一个WordCount程序(不用lambda)
首先在你配置的flink的集群上,比如主节点,打开一个8888端口:
nc -lk 8888
代码如下:
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1.创建一个flink stream 程序执行的环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.通过这个环境创建一个抽象的的数据集dataStream
DataStreamSource<String> dataStream = environment.socketTextStream("192.168.237.130", 8888);
// 3.调用dataStream上的方法 ,如:transformation(可以不调用) 和sink(必须调用,类似于spark的action,提交动作)。
// 调用transformation会将一个dataStream转换为一个新的dataStream
SingleOutputStreamOperator<String> dataStream2 = dataStream.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String line, Collector<String> out) throws Exception {
// 将一行单词进行切分
String[] words = line.split(" ");
for (String word : words) {
// 切分后输出
out.collect(word);
}
}
});
// 4.将单词和数字1进行组合,返回一个dataStream
SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream3 =
dataStream2.map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 5.进行分组聚合,根据单词进行keyBy,然后把对应的第一个数据进行累加。这里的数字是下标,对应的Tuple2<String,Integer>
SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream4 =
dataStream3.keyBy(0).sum(1);
// 到这里transformation结束
// 6.调用sink
dataStream4.print();
// 7.启动
environment.execute("StreamWordCount");
}
}
然后在终端上输入:
此时idea的打印台中会出现:
写一个WordCount程序(用lambda)
代码:
public class LambdaStreamWordCount {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建第一个dataStream
DataStreamSource<String> dataStream = environment.socketTextStream("192.168.237.130", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream2 = dataStream.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
Arrays.stream(line.split(" ")).forEach(word -> {
out.collect(Tuple2.of(word, 1));
});
}) // 如果使用了lambda表达式,必须使用returns来返回一个规定的类型
.returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream2.keyBy(0)
.sum(1);
sum.print();
environment.execute("LambdaStreamWordCount");
}
}
将程序打成jar包,放在页面上执行
- 代码需要改动的行:(第一个参数是ip,第二个参数是端口)
DataStreamSource<String> dataStream = environment.socketTextStream(args[0], Integer.parseInt(args[1]));
其他代码不需要改动
- 在终端上输入命令:
mvn clean package
- 上传到flink上:(输入这个类的全类名以及端口号+ip)
具体的操作可以见另一篇博客的内容:
Flink-简介以及standalone集群安装和简单的测试的页面操作启动Task模块(不同的是全类名不会自动补全,需要自己填)
- 终端上输入
- 查看结果: