sparkstreaming的reduceByKeyAndWindow窗口函数的用法

版权声明: https://blog.csdn.net/xianpanjia4616/article/details/82315954

今天我们主要来说一下spark中reduceByKeyAndWindow窗口函数的使用方法;

先看一下官网的图片吧:

这个是sparkstreaming提供的窗口计算,允许你在一个滑动的窗口中进行计算,所有这些窗口操作都需要两个参数 - windowLength和slideInterval。(窗口长度 - 窗口的持续时间,滑动间隔 - 执行窗口操作的间隔)
比如说我们现在要每隔2秒,统计前3秒内每一个单词出现的次数,这个时候就需要用这个窗口函数了;
一般我们可以这么写:reduceByKeyAndWindow(_+_,Seconds(3), Seconds(2)),每隔2秒(后面的2秒),统计前3秒的数据(前面的3秒),但是这个时候会有一个问题,当slideInterval>windowLength的时候,从图中可以看到time3会被计算2次,也就是说两个统计的部分会有重复,那这个怎么解决呢?不用急, 我们可以用reduceByKeyAndWindow的另一个重载的方法reduceByKeyAndWindow(_+_,_-_,Seconds(3s),seconds(2)).这个方法的意思,我们可以不用重新获取或者计算,而是通过获取旧信息来更新新的信息,这样即节省了空间又节省了内容,并且效率也大幅提升.下面我们看一下该方法的源码;

那么上图中的计算就变成了:
win1 = time1 + time2 + time3     win2 = win1 + time4 + time5 - time1 - time2 

我们再来看一下reduceByKeyAndWindow的源码:可以看出它需要两个函数,一个是计算新产生数据,一个是计算过时的数据

 def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc
    )
  }

_+_是对新产生的时间分片(time4,time5内RDD)进行统计,而_-_是对上一个窗口中,过时的时间分片(time1,time2) 进行统计

下面看一下我写的一个demo:  

package spark

import kafka.PropertiesScalaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

/**
  * sparkstreaming reduceByKeyAndWindow 窗口的使用;
  */
object windowFunction {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("Spark Streaming Jason")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    @transient
    val scc = new StreamingContext(conf, Seconds(1))
    scc.checkpoint("/home/jason/test")
    val topic = PropertiesScalaUtils.loadProperties("topic_combine")
    val topicSet = Set(topic)
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",   //latest;earliest
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
      , "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
      , "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val kafkaStreams = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)).map(_.value())
    val word = kafkaStreams.flatMap(_.split(" ")).map((_,1))
    val window_word = word.reduceByKeyAndWindow(_+_,_-_ ,Seconds(3),Seconds(2))
    window_word.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        rdd.foreachPartition(partition=>{
          partition.foreach(pair=>{
            println(pair._1 + "----------" + pair._2)
          })
        })
      }
    })
    scc.start()
    scc.awaitTermination()
  }
}

如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢

猜你喜欢

转载自blog.csdn.net/xianpanjia4616/article/details/82315954