一、 window增量聚合
场景:
窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果。
常用的聚合算子:
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
(1)reduce算子
场景:窗口内元素的聚合求和
/**
* window的增量聚合
*/
public class SocketWindowIncrAgg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);
SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number));
AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator
.timeWindowAll(Time.seconds(10));
windowedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer last, Integer current) throws Exception {
System.out.println("process last:"+last+" current:"+current);
return last + current;
}
}).print().setParallelism(1);
env.execute(SocketWindowIncrAgg.class.getSimpleName());
}
}
(2)aggregate算子
需求:求每隔窗口里面的数据的平均值
/**
* window的增量聚合
* 用aggregate算子计算平均值
*/
public class SocketWindowAvgAgg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);
SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number));
AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator
.timeWindowAll(Time.seconds(10));
windowedStream.aggregate(new MyAggregate()).print().setParallelism(1);
env.execute(SocketWindowAvgAgg.class.getSimpleName());
}
/**
* IN,输入的数据类型
* ACC,自定义的中间状态
* Tuple2(Integer,Integer)
* key: 计算数据的个数
* value:计算总值
* OUT,输出的数据类型
*
*/
private static class MyAggregate implements AggregateFunction<Integer, Tuple2<Integer,Integer>,Double> {
/**
* 初始化累加器
* @return
*/
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0,0);
}
/**
* 针对每个数据的操作
* @param element
* @param accumulator
* @return
*/
@Override
public Tuple2<Integer, Integer> add(Integer element, Tuple2<Integer, Integer> accumulator) {
//个数+1
//总的值累计
accumulator.f0+=1;
accumulator.f1+=element;
System.out.println("input:"+element+"|accumulator:"+accumulator);
return new Tuple2<>(accumulator.f0,accumulator.f1);
}
/**
* 计算结果
* @param accumulator
* @return
*/
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
System.out.println("call getResult:"+accumulator);
return (double) accumulator.f1/accumulator.f0;
}
/**
* 最终结果的合并
* @param a1
* @param b1
* @return
*/
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a1, Tuple2<Integer, Integer> b1) {
System.out.println("call merge:"+a1+" "+b1);
return Tuple2.of(a1.f0+b1.f0,a1.f1+b1.f1);
}
}
}
2、全量聚合
等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
常用算子:
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。
类似于map和RichMap的关系
/**
* window的全量计算
* 用ProcessAllWindowFunction抽象类,重写process方法
*/
public class SocketWindowFullAgg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);
SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number));
AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator
.timeWindowAll(Time.seconds(10));
windowedStream.process(new MyProcess())
.print().setParallelism(1);
env.execute(SocketWindowFullAgg.class.getSimpleName());
}
/**
* 抽象类只能继承
*
* @tparam IN The type of the input value.
* @tparam OUT The type of the output value.
* @tparam W The type of the window.
*
*/
private static class MyProcess extends ProcessAllWindowFunction<Integer,Integer,TimeWindow> {
@Override
public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
System.out.println("process start:"+elements);
int count = 0;
Iterator<Integer> integerIterator = elements.iterator();
while (integerIterator.hasNext()){
Integer number = integerIterator.next();
count += number;
}
out.collect(count);
}
}
}
二、window join
两个window之间可以进行join,join操作只支持三种类型的window:
滚动窗口,滑动窗口,会话窗口。
使用方式:
stream.join(otherStream) //两个流进行关联
.where(<KeySelector>) //选择第一个流的key作为关联字段
.equalTo(<KeySelector>)//选择第二个流的key作为关联字段
.window(<WindowAssigner>)//设置窗口的类型
.apply(<JoinFunction>) //对结果做操作
1、Tumbling Window Join
核心思想:两个事件流先join,然后去开窗
示例:
/**
* window的join
* 两个流进行滚动窗口的join
*/
public class SocketTumblingWindowJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));
DataStream<String> result = dataStream01.join(dataStream02)
.where(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer integer) throws Exception {
return integer;
}
})
.equalTo(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer integer) throws Exception {
return integer;
}
})
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<Integer, Integer, String>() {
@Override
public String join(Integer first, Integer second) throws Exception {
return first + ","+ second;
}
});
result.print().setParallelism(1);
env.execute(SocketTumblingWindowJoin.class.getSimpleName());
}
}
2、Sliding Window Join
/**
* window的join
* 两个流进行滑动窗口的join
*/
public class SocketSlideWindowJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));
DataStream<String> result = dataStream01.join(dataStream02)
.where(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer integer) throws Exception {
return 1;
}
})
.equalTo(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer integer) throws Exception {
return 1;
}
})
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),//size
Time.seconds(5)//slide
))
.apply(new JoinFunction<Integer, Integer, String>() {
@Override
public String join(Integer first, Integer second) throws Exception {
return first + ","+ second;
}
});
result.print().setParallelism(1);
env.execute(SocketSlideWindowJoin.class.getSimpleName());
}
}
3、Session Window Join
/**
* window的join
* 两个流进行session窗口的join
*/
public class SocketSesssionWindowJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));
DataStream<String> result = dataStream01.join(dataStream02)
.where(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer integer) throws Exception {
return 1;
}
})
.equalTo(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer integer) throws Exception {
return 1;
}
})
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.apply(new JoinFunction<Integer, Integer, String>() {
@Override
public String join(Integer first, Integer second) throws Exception {
return first + ","+ second;
}
});
result.print().setParallelism(1);
env.execute(SocketSesssionWindowJoin.class.getSimpleName());
}
}
4、Interval Join
/**
* window的join
* 两个流按照key,取窗口一段间隔内jioin
*
*/
public class SocketIntervalWindowJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));
DataStream<String> result = dataStream01
.keyBy(0)
.intervalJoin(dataStream02.keyBy(0))
.between(Time.seconds(-2),Time.seconds(2))
.process(new ProcessJoinFunction<Integer, Integer, String>() {
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) throws Exception {
out.collect(left+","+right);
}
});
result.print().setParallelism(1);
env.execute(SocketIntervalWindowJoin.class.getSimpleName());
}
}