flink-----实时项目---day05-------1. ProcessFunction

1. ProcessFunction

  ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

  • event(流元素)
  • state(容错,一致性,只能在Keyed流中使用)
  • timers(事件时间和处理时间,只能在keyed流中使用)

  ProcessFunction可以被认为是增加了keyed state和timers功能的FlatMapFunction。ProcesseFunction可以通过RuntimeContext访问Flink中的Keyed State,通过processElement方法中的Context实例访问流元素的时间戳,以及timerServer(注册定时器),如果watermark大于等于注册定时器的时间,就会调用onTimer方法(此处相当于一个回调函数),在调用期间,所有state的范围再次限定在创建定时器的key上,从而允许定时器操作keyed state。

注意:如果我们想要使用keyed state和timers(定时器),我们必须在一个keyed stream上应用ProcessFunction,如下所示

stream.keyBy(...).process(new MyProcessFunction())

案例1:使用ProcessFunction注册定时器

  此处要实现的功能就是使用定时器定时输出一些数据,不能使用窗口函数,数据的类型为:时间戳,单词(123422,hello)

ProcessFunctionWithTimerDemo
package cn._51doit.flink.day09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class ProcessFunctionWithTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 得到watermark,并没有对原始数据进行处理
        SingleOutputStreamOperator<String> lineWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });
        // 处理数据,获取指定字段
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], 1);
            }
        });
        //调用keyBy进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        // 没有划分窗口,直接调用底层的process方法
        keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String,Integer>>() {
            private transient ListState<Tuple2<String, Integer>> bufferState;
            // 定义状态描述器
            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<>(
                        "list-state",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
                );
                bufferState = getRuntimeContext().getListState(listStateDescriptor);
            }
            // 不划分窗口的话,该方法是来一条数据处理一条数据,这样输出端的压力会很大
            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                //out.collect(value);
                bufferState.add(value);
                //获取当前的event time
                Long timestamp = ctx.timestamp();
                System.out.println("current event time is : " + timestamp);

                //注册定时器,如果注册的是EventTime类型的定时器,当WaterMark大于等于注册定时器的实际,就会触发onTimer方法
                ctx.timerService().registerEventTimeTimer(timestamp+10000);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                Iterable<Tuple2<String, Integer>> iterable = bufferState.get();
                for (Tuple2<String, Integer> tp : iterable) {
                    out.collect(tp);

                }
            }
        }).print();

        env.execute();
    }
}
View Code

由于定时器中的时间为timestamp+10000,当输入分别输入1000,spark;11000,spark(该条数据触发定时器,调用onTimer()方法),输出如下结果

 同时其还会产生一个新的定时器:21000触发的定时器

扫描二维码关注公众号,回复: 11378235 查看本文章

注意

1.processElement()方法处理数据时一条一条进行处理的

2. 该案例实现了滚动窗口的功能,而滚动窗口的底层实现原理与此相似:processElement()方法+onTimer()方法

案例二:使用定时器实现类似滚动窗口的功能

ProcessFunctionWithTimerDemo2

package cn._51doit.flink.day09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 只有keyedStream在使用ProcessFunction时可以使用State和Timer定时器
 */
public class ProcessFunctionWithTimerDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,hello
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String word = line.split(",")[1];
                return Tuple2.of(word, 1);
            }
        });

        //调用keyBy进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        //没有划分窗口,直接调用底层的process方法
        keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ListState<Tuple2<String, Integer>> bufferState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
                        "list-state",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){})
                );

                bufferState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                //out.collect(value);

                bufferState.add(value);
                //获取当前的event time
                Long timestamp = ctx.timestamp();

                //10:14:13   ->   10:15:00
                //输入的时间 [10:14:00, 10:14:59) 注册的定时器都是 10:15:00
                System.out.println("current event time is : " + timestamp);

                //注册定时器,如果注册的是EventTime类型的定时器,当WaterMark大于等于注册定时器的时间,就会触发onTimer方法
                long timer = timestamp - timestamp % 60000 + 60000;
                System.out.println("next timer is: " + timer);
                ctx.timerService().registerEventTimeTimer(timer);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                Iterable<Tuple2<String, Integer>> iterable = bufferState.get();

                for (Tuple2<String, Integer> tp : iterable) {
                    out.collect(tp);
                }

                //请求当前ListState中的数据
                bufferState.clear();
            }
        }).print();

        env.execute();


    }
}
View Code

注意的代码

//注册定时器,如果注册的是EventTime类型的定时器,当WaterMark大于等于注册定时器的时间,就会触发onTimer方法
                long timer = timestamp - timestamp % 60000 + 60000;
                System.out.println("next timer is: " + timer);
                ctx.timerService().registerEventTimeTimer(timer);

改变:使用Process Time

ProcessFunctionWithTimerDemo3

package cn._51doit.flink.day09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 只有keyedStream在使用ProcessFunction时可以使用State和Timer定时器
 *
 * Processing Time类型的定时器
 *
 */
public class ProcessFunctionWithTimerDemo3 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //hello
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //调用keyBy进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        //没有划分窗口,直接调用底层的process方法
        keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ListState<Tuple2<String, Integer>> bufferState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
                        "list-state",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){})
                );

                bufferState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                bufferState.add(value);
                //获取当前的processing time
                long currentProcessingTime = ctx.timerService().currentProcessingTime();

                //10:14:13   ->   10:15:00
                //输入的时间 [10:14:00, 10:14:59) 注册的定时器都是 10:15:00
                System.out.println("current processing time is : " + currentProcessingTime);

                //注册定时器,如果注册的是ProcessingTime类型的定时器,当SubTask所在机器的ProcessingTime大于等于注册定时器的时间,就会触发onTimer方法
                long timer = currentProcessingTime - currentProcessingTime % 60000 + 60000;
                System.out.println("next timer is: " + timer);
                //注册ProcessingTime的定时器
                ctx.timerService().registerProcessingTimeTimer(timer);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                Iterable<Tuple2<String, Integer>> iterable = bufferState.get();

                for (Tuple2<String, Integer> tp : iterable) {
                    out.collect(tp);
                }

                //请求当前ListState中的数据
                bufferState.clear();
            }
        }).print();

        env.execute();


    }
}
View Code

猜你喜欢

转载自www.cnblogs.com/jj1106/p/13197980.html
今日推荐