目录
window join
窗口连接把两个流中相同窗口通过一个键值连接起来。然后,两边的元素被传递到用户定义的JoinFunction或FlatJoinFunction,在这里用户可以发出满足联接条件的结果。连接模型:
//StreamA:左流
streamA
.join(streamB)//StreamB:右流
.where(<key selector>)//where条件:左流的Key
.equalTo(<key selector>)//equalTo条件:右流的Key
.window(WindowAssigner)//开启Window窗口
.apply (new JoinFunction () {...});//处理逻辑:JoinFunction 或 FlatJoinFunction
注意:
- 只能做inner join;如果要实现 left Join 和 right join ,需要结合CoGroup()函数进行转换
- Windows Join的元素必须属于同一个窗口的,不同窗口间的元素是不能Join
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TransformationsWindowJoin
{
public static void main(String [] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*输入
a ac 1606810559
b bf 1606810565
c af 1606811064
*/
DataStream<String> text = env.socketTextStream("192.168.23.210",9000);
DataStream stream = text.map(new MapFunction<String, Tuple3<String,String,Long >>() {
@Override
public Tuple3<String,String,Long > map(String s) throws Exception {
String[] words=s.split(" ");
return Tuple3.of(words[0],words[1],Long.parseLong(words[2]));
}
});
//左流,设置水位线 WaterMark
DataStream<Tuple3<String,String,Long>> leftStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,String,Long>>(Time.milliseconds(0)) {
@Override
public long extractTimestamp(Tuple3 t) {
return Long.parseLong(t.f2.toString());
}
});
DataStream<Tuple3<String,String,Long>> rightStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,String,Long>>(Time.milliseconds(0)) {
@Override
public long extractTimestamp(Tuple3 t) {
return Long.parseLong(t.f2.toString());
}
});
DataStream joinStream= leftStream
.join(rightStream)
//左边数据流的key
.where(new KeySelector<Tuple3<String, String, Long>, Long>() {
@Override
public Long getKey(Tuple3<String, String, Long> t) throws Exception {
return t.f2;
}
})
//右边数据流的key
.equalTo(new KeySelector<Tuple3<String, String, Long>, Long>() {
@Override
public Long getKey(Tuple3<String, String, Long> t) throws Exception {
return t.f2;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, Long,String, Long>>() {
@Override
public Tuple5<String, String, Long,String, Long> join(Tuple3<String, String, Long> t1, Tuple3<String, String, Long> t2) throws Exception {
return Tuple5.of(t1.f0,t1.f1,t1.f2,t2.f1,t2.f2);
}
});
joinStream.print();
env.execute();
}
}
interval join
Flink 针对KeyedStream 提供了interval join。它使用一个公共键连接两个流的元素(我们现在称它们为input1和input2),其中流input2的元素的时间戳与流input1中元素的时间戳之间存在相对时间间隔。
Interval Join不依赖Flink的WindowAssigner,而是根据一个时间间隔(Interval)界定时间。Interval需要一个时间下界(lower bound)和上界(upper bound)如果我们将input1和input2进行Interval Join,input1中的某个元素为input1.element1,时间戳为input1.element1.ts,那么一个Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在这个时间段内的元素将会和input1.element1组成一个数据对。用数学公式表达为,凡是符合下面公式:
input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN语义,两两组合在一起。上下界可以是正数也可以是负数。
默认是包含上下边界的,可以通过.lowerBoundExclusive() 和 .upperBoundExclusive函数进行排除边界值。
Interval Join内部是用缓存来存储所有数据的,因此需要注意缓存数据不能太大,以免对内存造成绝大压力。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class TransformationsIntervalJoin
{
public static void main(String [] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*输入
a ac 1606810559
b bf 1606810565
c af 1606811064
*/
DataStream<String> text = env.socketTextStream("192.168.23.210",9000);
DataStream stream = text.map(new MapFunction<String, Tuple3<String,String,Long >>() {
@Override
public Tuple3<String,String,Long > map(String s) throws Exception {
String[] words=s.split(" ");
return Tuple3.of(words[0],words[1],Long.parseLong(words[2]));
}
});
//左流,设置水位线 WaterMark
DataStream<Tuple3<String,String,Long>> leftStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,String,Long>>(Time.milliseconds(0)) {
@Override
public long extractTimestamp(Tuple3 t) {
return Long.parseLong(t.f2.toString());
}
});
DataStream<Tuple3<String,String,Long>> rightStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String,String,Long>>(Time.milliseconds(0)) {
@Override
public long extractTimestamp(Tuple3 t) {
return Long.parseLong(t.f2.toString());
}
});
DataStream joinStream= leftStream.keyBy(0).intervalJoin(rightStream.keyBy(0))
.between(Time.seconds(-5),Time.seconds(10))
.process(new ProcessJoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String,Long,String, Long> >() {
@Override
public void processElement(Tuple3<String, String, Long> t1, Tuple3<String, String, Long> t2, Context context, Collector<Tuple5<String, String,Long,String, Long> > collector) throws Exception {
Tuple5<String, String,Long,String, Long> t3=Tuple5.of(t1.f0,t1.f1,t1.f2,t2.f1,t2.f2);
collector.collect(t3);
}
});
joinStream.print();
env.execute();
}
}