Window Join
窗口join将共享相同key并位于同一窗口中的两个流的元素连接在一起。可以试用WindowAssigner定义这些窗口,并根据两个流的元素对其进行评估。然后将双方的元素传递到用户定义的JoinFunction或FlatJoinFunction,在此用户可以发出满足连接条件的结果。代码如下
streamA.join(streamB)
.where(<KeySelector>) //streamA某个字段
.equalTo(<KeySelector>) //streamB某个字段
.window(<WindowAssigner>) //指定窗口分配器
.apply(<JoinFunction>) //运用join Function
Note
- 创建两个流的元素的成对组合的行为就像一个内部链接,这意味着如果一个流中的元素没有与另一个流中要连接的元素对应的元素,则不会发出该元素。
- 那些确定加入的元素将以最大的时间戳作为时间戳。例如,以[5,10)为边界的窗口将导致连接的元素具有9作为其时间戳。
Tumbling Window Join
当执行滚动窗口连接时,所有具有公共key和公共滚动窗口的元素都按成对组合连接,并传递到JoinFunction或FlatJoinFunction。因为它的行为就像一个内部连接,所以在其滚动窗口中不发射一个流中没有其他流元素的元素。
object FlinkTumblingWindowJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//默认Flink用的是处理时间,必须设置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)//并行度设置为1,方便测试和观察
//001 zhangsan 时间戳
var user= env.socketTextStream("CentOS", 9999)
.map(line=>line.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks(2000))
//apple 001 时间戳
var order= env.socketTextStream("CentOS", 8888)
.map(line=>line.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks(2000))
user.join(order)
.where(t=>t._1)
.equalTo(t=>t._2)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(2))
.apply((v1,v2)=>(v1._1,v1._2,v2._1))
.print("连接结果")
env.execute("FlinkTumblingWindowJoin")
}
}
Sliding Window Join
执行滑动窗口连接时,所有具有公共键和公共滑动窗口的元素都按成对组合进行连接,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流中没有其他流元素的元素不会被发出。请注意,某些元素可能在一个滑动窗口中连接,但不能在另一个窗口中连接。
object FlinkSlidingWindowJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//默认Flink用的是处理时间,必须设置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)//并行度设置为1,方便测试和观察
//001 zhangsan 时间戳
var user= env.socketTextStream("CentOS", 9999)
.map(line=>line.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks(2000))
//apple 001 时间戳
var order= env.socketTextStream("CentOS", 8888)
.map(line=>line.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks(2000))
user.join(order)
.where(t=>t._1)
.equalTo(t=>t._2)
.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.allowedLateness(Time.seconds(2))
.apply((v1,v2)=>(v1._1,v1._2,v2._1))
.print("连接结果")
env.execute("FlinkSlidingWindowJoin")
}
}
Interval Join
Interval Join使用公共key连接两个流(将他们称为A和B)的元素,并且流B的元素具有与流A的元素时间戳相对时间间隔的时间戳。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
或
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
其中a和b是A和B的元素,它们共享一个公共key。只要lowerBound始终小于或等于upperBound,则lowerBound和upperBound都可以为负或正。Interval Join当前仅执行内部连接。将一对元素传递给ProcessJoinFunction时,将为它们分配两个元素的较大时间戳。
class UserDefineProcessJoinFunction extends ProcessJoinFunction[(String,String,Long),(String,String,Long),String]{
val sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
override def processElement(left: (String, String, Long),
right: (String, String, Long),
ctx: ProcessJoinFunction[(String, String, Long), (String, String, Long), String]#Context,
out: Collector[String]): Unit = {
val leftTimestamp = ctx.getLeftTimestamp
val rightTimestamp = ctx.getRightTimestamp
val timestamp=ctx.getTimestamp
println(s"left:${sdf.format(leftTimestamp)},right:${sdf.format(rightTimestamp)} time:${sdf.format(timestamp)}")
out.collect(s"${left._1} ${left._2} ${right._1}")
}
}
object FlinkIntervalJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//默认Flink用的是处理时间,必须设置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)//并行度设置为1,方便测试和观察
//001 zhangsan 时间戳
var user= env.socketTextStream("CentOS", 9999)
.map(line=>line.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPunctuatedWatermarks(2000))
.keyBy(t=>t._1)
//apple 001 时间戳
var order= env.socketTextStream("CentOS", 8888)
.map(line=>line.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new OrderAssignerWithPunctuatedWatermarks(2000))
.keyBy(t=>t._2)
user.intervalJoin(order)
.between(Time.seconds(0), Time.seconds(4))
.process(new UserDefineProcessJoinFunction)
.print("输出")
env.execute("FlinkIntervalJoin")
}
}