文章目录
环境
WIN10+IDEA2021+JDK1.8+FLINK1.14
基于处理时间的定时器
registerProcessingTimeTimer
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class Hi {
public static void main(String[] args) throws Exception {
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource());
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) {
//获取处理时间
long processingTime = ctx.timerService().currentProcessingTime();
//输出
out.collect("当前处理时间" + processingTime);
//注册处理时间定时器(2秒后触发)
ctx.timerService().registerProcessingTimeTimer(processingTime + 2000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("触发" + timestamp + "定时器");
}
}).print();
//环境执行
env.execute();
}
public static class AutomatedSource implements SourceFunction<Long> {
public AutomatedSource() {
}
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException {
for (long l = 0L; l < 99; l++) {
Thread.sleep(1000L);
sc.collect(l);
}
}
@Override
public void cancel() {
}
}
}
基于事件时间的定时器
Setting timers is only supported on a keyed streams.
测试1
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Hi {
public static void main(String[] args) throws Exception {
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流,确定 事件时间的水位线策略
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
//.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) {
//输出
out.collect("当前水位线 " + ctx.timerService().currentWatermark() + ";事件时间 " + ctx.timestamp());
//注册事件时间定时器(2秒后触发)
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1999L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("触发" + timestamp + "定时器");
}
}).print();
//环境执行
env.execute();
}
public static class AutomatedSource implements SourceFunction<Long> {
public AutomatedSource() {
}
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException {
Long[] ls = {
1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L};
for (Long l : ls) {
Thread.sleep(299L);
sc.collect(l);
}
}
@Override
public void cancel() {
}
}
}
打印结果
测试2
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class Hi {
public static void main(String[] args) throws Exception {
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//创建流,确定 事件时间的水位线策略
SingleOutputStreamOperator<Long> d = env.addSource(new AutomatedSource())
//.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forMonotonousTimestamps()
.assignTimestampsAndWatermarks(WatermarkStrategy.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((element, recordTimestamp) -> element * 1000L));
//定时器
d.keyBy(s -> true).process(new KeyedProcessFunction<Boolean, Long, String>() {
@Override
public void processElement(Long value, Context ctx, Collector<String> out) {
//输出
out.collect("当前水位线 " + ctx.timerService().currentWatermark() + ";事件时间 " + ctx.timestamp());
//注册事件时间定时器(2秒后触发)
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 1999L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("触发" + timestamp + "定时器");
}
}).print();
//环境执行
env.execute();
}
public static class AutomatedSource implements SourceFunction<Long> {
public AutomatedSource() {
}
@Override
public void run(SourceFunction.SourceContext<Long> sc) throws InterruptedException {
Long[] ls = {
1L, 2L, 3L, 4L, 5L, 6L, 9L, 8L, 7L};
for (Long l : ls) {
Thread.sleep(299L);
sc.collect(l);
}
}
@Override
public void cancel() {
}
}
}
打印结果