前言
我们在IDE中编写Flink代码,我们希望在IEDA中运行程序便能够查看到Web-UI,从而快速的了解Flink程序的运行情况(而无需自己手动安装Flink,以及打包提交任务)
使用
添加依赖
本示例是基于Flink1.12
进行演示的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.12.2</version>
<scope>compile</scope>
</dependency>
代码中启用本地WEB-UI
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
完整代码演示
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9996);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//在普通的datastream的api搞不定的时候就可以使用它了
//KeyedProcessFunction只有在keyBy才能使用
watermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<String> out) throws Exception {
for (Tuple2<String, Long> value : values) {
out.collect(value.f0);
}
}
}).print();
env.execute();
}
}