Flink窗口之CountWindowAll
代码
package Windows;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
public class CountWindowAllOp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("t1", 9999);
SingleOutputStreamOperator<Integer> map = source.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return Integer.parseInt(value);
}
});
AllWindowedStream<Integer, GlobalWindow> windows = map.countWindowAll(3);
SingleOutputStreamOperator<Integer> sum = windows.sum(0);
SingleOutputStreamOperator<Integer> max = windows.max(0);
SingleOutputStreamOperator<Integer> min = windows.min(0);
sum.print();
env.execute();
}
}
输入数据
[root@t1 ~]# nc -lk 9999
1
2
3
4
5
6
7
8
运行结果
6> 6
7> 15
总结
- 每3条数据进行一次统计 7 和 8 输入后没有达到窗口大小所以不统计