import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} /** * DATE:2022/10/4 13:22 * AUTHOR:GX */ object TransformationAggTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.addSource(new ClickSource) //1.1相同Key一定会分配到同一个分区,不同Key有可能分配到同一个分区 //keyBy -> keyedStream -> DataStream //有状态的流处理,如果要使用聚合算子,要使用在只含有有限个key的数据流上,如果key无限多,则会消耗完内存资源 // stream.keyBy(_.user) // .maxBy("timestamp") // .print() //1.2 // stream.keyBy(new MyKeySelector) // .maxBy("timestamp") // .print() //2.reduce规约聚合,提取当前最活跃用户 stream.map(x => (x.user,1L)) .keyBy(_._1) .reduce(new MyReduce) //统计每个用户的活跃度 .keyBy(x => true) //将所有数据按照同样的Key分到同一个组中 // .maxBy("_2") .reduce( (statu,x) => if (x._2 >= statu._2) x else statu ) //选取当前最活跃的用户 .print() env.execute() } class MyKeySelector extends KeySelector[Event,String] { override def getKey(value: Event): String = value.user } class MyReduce extends ReduceFunction[(String,Long)]{ override def reduce(value1: (String, Long), value2: (String, Long)): (String, Long) = { (value1._1,value1._2 + value2._2) } } }
聚合转换算子 (归约聚合(reduce))
猜你喜欢
转载自blog.csdn.net/GX_0824/article/details/127161921
今日推荐
周排行