将数据封装成样例类,开启滑动窗口,将没个页面的状态欲缓存起来,统计后输出,在根据时间分类,求出topN
package com.yan
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{
ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristictopN
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class UserBehavior(userId: Long, itemId: Long, categoryId: Long, behavior: String, timestamp: Long)
case class itemViewCount(itemId: Long, timestamp: Long, count: Long)
object hot_item {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "node01:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val dataStream = env.addSource(new FlinkKafkaConsumer[String]("hot_Item", new SimpleStringSchema(), properties)).map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toInt, dataArray(1).trim.toInt, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val processStream = dataStream.filter(_.behavior == "pv")
.keyBy(_.itemId)
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new myAgg(), new windowResult())
.keyBy(_.timestamp)
.process(new TopN_items(3))
processStream.print()
env.execute()
}
}
class myAgg() extends AggregateFunction[UserBehavior, Long, Long] {
override def add(value: UserBehavior, accumulator: Long) = accumulator + 1L
override def createAccumulator() = 0L
override def getResult(accumulator: Long) = accumulator
override def merge(a: Long, b: Long) = a + b
}
class avr_Agg() extends AggregateFunction[UserBehavior, (Long, Long), Double] {
override def add(value: UserBehavior, accumulator: (Long, Long)) = (accumulator._1 + value.timestamp, accumulator._2 + 1)
override def createAccumulator() = (0L, 0)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2)
}
class windowResult() extends WindowFunction[Long, itemViewCount, Long, TimeWindow] {
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[itemViewCount]): Unit = {
out.collect(itemViewCount(key, window.getEnd, input.iterator.next()))
}
}
class TopN_items(topSize: Int) extends KeyedProcessFunction[Long, itemViewCount, String] {
private var itemState: ListState[itemViewCount] = _
override def open(parameters: Configuration) = {
itemState = getRuntimeContext.getListState(new ListStateDescriptor[itemViewCount]("itemState", classOf[itemViewCount]))
}
override def processElement(value: itemViewCount, ctx: KeyedProcessFunction[Long, itemViewCount, String]#Context, out: Collector[String]) = {
itemState.add(value)
ctx.timerService().registerEventTimeTimer(value.timestamp + 10)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, itemViewCount, String]#OnTimerContext, out: Collector[String]) = {
val allItem: ListBuffer[itemViewCount] = new ListBuffer[itemViewCount]
import scala.collection.JavaConversions._
for (item <- itemState.get()) {
allItem += item
}
val sorted = allItem.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
itemState.clear()
val result = new StringBuilder()
result.append("time : ").append(timestamp - 10).append("\n")
for (i <- sorted.indices) {
val currentItem = sorted(i)
result.append("NO: ").append(i + 1).append(" 商品ID为").append(currentItem.itemId).append("浏览量为: ").append(currentItem.count).append("\n")
}
result.append("======================")
Thread.sleep(500)
out.collect(result.toString())
}
}