目录
3.3 env.executeAsync("jobName")
1、Flink application 的构成
通常会将一个计算任务,称为 application 或者 job
Flink application 的构成 :
1. 获取 flink应用程序 的执行环境对象(Execution Environment)
2. 指定读取数据源(Source)
3. 定义基于数据的转换操作(Transformations)
4. 定义计算结果的输出位置(Sink)
5. 触发程序执行(Execute)
2、创建执行环境
传送门:看这里
3、触发程序执行
3.1 Flink 任务触发说明
Flink程序和Spark程序一样,都是延迟执行;当Driver程序的main方法被执行时,数据加载和转换不会直接发生。只有当指定到 env.execute() 时,计算任务才会被触发执行。
3.2 env.execute("jobName")
功能说明:
触发程序执行,并等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。
3.3 env.executeAsync("jobName")
功能说明:
触发程序执行,不会等待作业完成,触发作业异步执行。
它会返回一个 JobClient
,你可以通过它与刚刚提交的作业进行通信。
4、这是一个完整的入门案例
需求描述:统计每个小时内的每个用户的活跃次数
代码示例(看不懂没关系,继续看后续的文章,将会使你豁然开朗):
开发语言:java1.8
flink版本:1.17.0
package com.baidu.showcase;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashMap;
/*
* 需求描述:By用户统计每个小时内的活跃次数
*
* */
public class CountUvByHour {
public static void main(String[] args) throws Exception {
// 1.获取执行环境(Execution Environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 2.读取数据源(Source)
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// 3.基于source做转换操作(Transformations)
SingleOutputStreamOperator<String> resultDs = source.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
}
}
)
// 添加水位线
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
(event, timestamp) -> event.f1
)
)
// 根据User分组
.keyBy(e -> e.f0)
// 基于事件时间设置10s的滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 对窗口内用户的活跃次数计数
.process(
new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
HashMap<String, Long> map = new HashMap<>();
for (Tuple2<String, Long> element : elements) {
map.put(element.f0
, map.getOrDefault(element.f0, 0L) + 1L);
}
StringBuffer record = new StringBuffer();
//record.append("============================================\n");
record.append("窗口时间范围:[" + start + "," + end + ")\n");
record.append("当前窗口用户活跃次数:" + map + "\n");
out.collect(record.toString());
}
}
);
// 4.将计算结果的输出位置(Sink)
resultDs.print();
// 5.触发程序执行(Execute)
env.execute();
}
}
运行结果: