接下来来讲讲flink中的广播状态
/**
* 搜索用户匹配行为 使用广播状态
* 去匹配比如该用户是否是先login再buy 或者先login再pay这种组合行为
*/
case class Action(userid:String,action:String)
case class Pattern1(act1:String,act2:String)
object BroadcastStateExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//定义一个实时的用户行为流
val actionDS = env.fromElements(
Action("zhangsan","login"),
Action("zhangsan","buy"),
Action("lisi","login"),
Action("lisi","pay")
)
//定义一个行为模式流,代表了要检查的标准
val patternDS = env.fromElements(
Pattern1("login","buy"),
Pattern1("login","pay")
)
//建造广播流的描述器 广播流底层是kv形式 所以也必须用mapState的解释器
// 这里的两个类型分别指的是map的key和value的类型 因为这里无所谓key 所以也可以写成unit类和Pattern1类 写String也没事
val msd = new MapStateDescriptor[String, Pattern1]("pattern", classOf[String], classOf[Pattern1])
//按照描述器制造1个广播流
val broadcastDS = patternDS.broadcast(msd)
//将事件流和广播流链接在一起
actionDS.keyBy(_.userid).connect(broadcastDS).process(new PatternFilter).print()
env.execute()
}
}
class PatternFilter extends KeyedBroadcastProcessFunction[String,Action,Pattern1,(String,Pattern1)]{
//定义值状态 保存上一次的用户行为
private lazy val preAct: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("pre_action", classOf[String]))
override def processElement(in1: Action, readOnlyContext: KeyedBroadcastProcessFunction[String, Action, Pattern1, (String, Pattern1)]#ReadOnlyContext, collector: Collector[(String, Pattern1)]): Unit = {
//获得广播流
val bcDS = readOnlyContext.getBroadcastState(new MapStateDescriptor[String,Pattern1]("pattern",classOf[String],classOf[Pattern1]))
//查看用户属于哪种行为
if(bcDS.contains(s"${preAct.value()}_${in1.action}")){
collector.collect((in1.userid,bcDS.get(s"${preAct.value()}_${in1.action}")))
//清除状态
preAct.clear()
}
//记录本次用户行为
preAct.update(in1.action)
}
//优先执行 广播流方法?因为要先加载完广播流
override def processBroadcastElement(in2: Pattern1, context: KeyedBroadcastProcessFunction[String, Action, Pattern1, (String, Pattern1)]#Context, collector: Collector[(String, Pattern1)]): Unit = {
//获取广播流
val bcState = context.getBroadcastState(new MapStateDescriptor[String,Pattern1]("pattern",classOf[String],classOf[Pattern1]))
//将你获得的规则数据存放到广播流中
bcState.put(s"${in2.act1}_${in2.act2}",in2)
}
}