SparkStreaming知识总结

一、流式计算的概述

1.1 什么是流式计算

1. 数据流与静态数据的区别
	-- 数据流指的就是不断产生的数据,是源源不断,不会停止。
	-- 静态数据指的就是存储在磁盘中的固定的数据
2. 流式计算的概念
      就是对数据流进行计算,由于数据是炼苗不断的产生的,所以这个计算也是一直再计算,不会停止。
3. 流式计算的数据流有什么特点:
   - 数据是无界的(unbounded)
   - 数据是动态的
   - 计算速度是非常快的(是不断计算的,每次计算都是微小的批量数据,因此速度快,而且还是基于内存的)
   - 计算不止一次 
   - 计算不能终止
4. 离线计算的特点:
	- 数据是有界的(unbounded)
	- 数据是静态的
	- 计算速度通常较慢   
	- 计算只执行一次
	- 计算终会终止

1.2 常见的离线和流式计算框架

1. 离线计算框架
	-- mapreduce
	-- hive
	-- sparkcore
	-- sparksql
	-- flink-dataset
2. 流式计算框架
   -- storm
   -- sparkStreaming
   -- flink-datastream(blink)

1.3 SparkStreaming简介

1.3.1 简介

1. SparkStreaming也是Spark生态栈中的一个重要模块,是一个流式计算框架
2. SparkStreaming属于准实时计算框架
3. SparkStreaming是SparkCore的api的一种扩展,使用DStream(离散流)作为数据模型。 本质就是一个时间序列上的RDD。

DStream,本质上是RDD的序列。SparkStreaming的处理流程可以归纳为下图:
在这里插入图片描述

流式计算框架从延迟的角度来分类:

1. 纯实时流式计算: 毫秒级别的延迟,或者没有延迟的计算。
2. 准实时流式计算: 亚秒级别,秒级别,分钟级别的计算

流式计算框架从处理的记录条数来分类

1. 纯实时流式计算: 来一条记录,就计算一条记录。
2. 准实时流式计算: 微小的批处理,还是多条记录一起计算。

1.3.2 原理

DStream数据流模型

1. SparkStreaming 会实时的接受输入的数据
2. SparkStreaming 会按照固定长度的时间段将源源不断进来的数据划分成batch
3. SparkStreming 会每一个batch进行一次计算,计算是不停止的
4. 每次的计算结果也是一个batch,因此结果集就是多个batch的构成
5. SparkStreaming,将数据流抽象成DStream.  称之为离散流的数据模型。本质就是一个时间序列上的RDD。
6. 在整个数据流作业中,会有多个DStream。


参考下图:    rdd1 就是一个时间序列上的 DStream
				rdd2 就是一个时间序列上的 DStream
				rdd3 就是一个时间序列上的 DStream
				rdd4 就是一个时间序列上的 DStream
             

      8:00:00      hello world  hello java hello c++
      
      rdd1 = sc.textFile("....")
      rdd2 = rdd1.flatMap(_.split(" "))
      rdd3 = rdd2.map((_,1))
      rdd4 = rdd3.reduceByKey(_+_)
      
      针对于rdd1来说:
             8:00:00      hello world  hello java hello c++
             8:00:10      no zuo no die
             8:00:20      you are best
             8:00:30:     hello you are best
      针对于rdd2来说:
             8:00:00      [hello, world,hello,java,hello,c++]
             8:00:10      [no,zuo,no,die]
             8:00:20      [you,are,best]
             8:00:30:     [hello,you,are,best]
      针对于rdd3来说:
             8:00:00      (hello,1), (world,1),(hello,1),(java,1),(hello,1),(c++,1)
             8:00:10      (no,1),(zuo,1),(no,1),(die,1)
             8:00:20      (you,1),(are,1),(best,1)
             8:00:30:     (hello,1),(you,1),(are,1),(best,1)

参考下图: 一个DStream是由不同时间段上的同一个RDD构成的
在这里插入图片描述

参考下图:如果算子的返回值是DStream,则不管是哪一个时间段上的数据,只要调用了同一个算子,则返回的都同一个DStream
在这里插入图片描述

1.3.3 Storm VS SparkStreaming VS Flink

在这里插入图片描述

1.4 怎样选择流式处理框架

何时选择storm
	--需要纯实时,不能忍受1秒以上延迟的场景
	--实时计算的功能中,要求可靠的事务机制和可靠性机制,即数据的处理完全精准,一条也不能多,一条也不能少
   --针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况)
何时选择Spark Streaming	
	--不满足上述3点要求的话,我们可以考虑使用Spark Streaming来进行实时计算
	--如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询、图计算和MLIB机器学习等业务功能,而且实时计算中,
	   可能还会牵扯到高延迟批处理、交互式查询等功能,,那么就应该首选Spark生态,用Spark Core开发离线批处理,
	   用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。
何时选择Flink
	支持高吞吐、低延迟、高性能的流处理
	支持带有事件时间的窗口(Window)操作
	支持有状态计算的Exactly-once语义
	支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
	支持具有Backpressure功能的持续流模型
	支持基于轻量级分布式快照(Snapshot)实现的容错
	一个运行时同时支持Batch on Streaming处理和Streaming处理
	Flink在JVM内部实现了自己的内存管理
	支持迭代计算
	支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

二、SparkStreaming的入门编程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>redis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- sparkstreaming的核心包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- sparkstreaming与kafka的整合包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
         <!-- redis的整合包 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.0.0</version>
        </dependency>
          <!-- sparksql的核心包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
    </dependencies>
</project>

2.1 wordcount案例演示

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{
    
    Seconds, StreamingContext}

/**
 *  sparkCore的入门api:    SparkContext
 *  sparkSql的入门:        SparkSession
 *  sparkStreaming的入门API: StreamingContext
 *
 *
 *  注意:
 *     1. 要先使用nc指令 开启qianfeng01和10086端口,否则sparkStreaming会提前报错
 *          在qianfeng01上运行指令: nc -lp 10086
 *                          -l  表示监听
 *                          -p  表示端口
 */
object Streaming_01_WordCount {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")

        /**
         * 构造器:StreamingContext(conf:SparkConf, batchDuration:Duration)
         * 第一个参数:配置对象
         * 第二个参数:用于指定SparkStreaming的流式计算的batch的时间间隔,即时间片段
         *           Durations.milliseconds(milliseconds: Long)    毫秒级别
         *           Durations.seconds(seconds: Long)      秒级别
         *           Durations.minutes(minutes: Long)      分钟级别
         *           Milliseconds(milliseconds: Long)    毫秒级别
         *           Seconds(seconds: Long)   秒级别
         *           Minutes(minutes: Long)  分钟级别
         */
        val context = new StreamingContext(conf, Seconds(10))

        /**
         * 利用TCP协议的套接字,实时的监听一个端口,如果有数据,就采集,并计算。
         * socketTextStream(hostname: String,port: Int,......)
         * T:  泛型
         * hostname: 要监听的主机名
         * port:要监听的端口号
         *
         */
        val dStream: ReceiverInputDStream[String] = context.socketTextStream("qianfeng01", 10086)
        // 打印数据流中的数据,默认打印10条记录
        //dStream.print()

        // 按照空格切分成各个单词,  返回的是一个新的DStream
        val wordDStream: DStream[String] = dStream.flatMap(_.split(" "))

        //构建成元组,返回一个新的DStream
        val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))

        //进行统计每个单词的数量,返回的是一个新的DStream
        val wordCountDStream: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_ + _)

        //打印,默认打印10条
        wordCountDStream.print()



        //启动程序
        context.start()

        /**
         * 因为main方法一旦结束,整个程序就结束,因此需要让main方法处于等待状态
         */
        context.awaitTermination()

    }
}

2.2 从内存中的Queue中获取数据

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

import scala.collection.mutable

/**
 * 从内存中的Queue中获取数据
 */
object Streaming_02_FromQueue {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        /**
         * queueStream[T: ClassTag]( queue: Queue[RDD[T]],oneAtATime: Boolean = true)
         * 从一个RDD队列中获取一个或多个RDD数据,进行处理。
         * queue:RDD队列
         * oneAtATime: 是否一次处理一个RDD,默认值是true。   false表示队列中有多少,就一次性处理多少。 注意:从队列中获取数据时,队列中就没有该数据了。
         */
        val queue = new mutable.Queue[RDD[Int]]()
        val dStream: InputDStream[Int] = ssc.queueStream(queue,true)

        //直接打印,默认打印10行
        dStream.print()

        //开启数据流作业
        ssc.start()

        /**
         * 利用main线程,向队列中源源不断的添加RDD。
         */
        val rdd: RDD[Int] = ssc.sparkContext.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8))
        for(i<- 1 to 300){
    
    
            queue.enqueue(rdd)//将rdd填入队列中
            Thread.sleep(1000)
            // println(queue.size)  //如果将oneAtATime改为false,则可证明队列中的数据每10秒都会被清空。
        }

        // 该方法的作用就是阻塞main方法,不让其结束。因为main方法已结束,就会停止数据流作业
        ssc.awaitTermination()
    }
}

2.3 自定义接收器

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

object Streaming_03_CustomReceiver {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        //从采集器中获取DStream

        val dStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
        dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print(20)

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 自定义一个采集器:
     * 1. 继承采集器抽象类Receiver,   指定泛型,指定存储级别
     * 2. 重写两个抽象方法
     */
    class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
    
    
        var flag = true
        /**
         * 开启采集数据的方法,此方法是框架主动调用
         */
        override def onStart(): Unit = {
    
    
            var t = new Thread(){
    
    
                override def run(){
    
    
                   while(flag){
    
    
                       val list = List("hello world hello java hello java","hello world welcome to china hello","hello world hello java c++")
                       val element: String = list(math.floor(math.random * 3).toInt)
                       //利用采集器的存储方法,存储数据
                       store(element)

                       Thread.sleep(200)  //让产生的速度降低,否则容易造成机器的cpu压力过大
                   }
                }
            }
            //开启线程
            t.start()
        }

        /**
         * 到采集的结束时间,就会调用该方法,终止采集数据
         */
        override def onStop(): Unit = {
    
    
            flag = false
        }
    }
}

2.4 读取本地文件

不能读取已经存在的文件,只能读取IO产生的新文件,只能读取一次。

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 * 不能读取已经存在的文件,只能读取IO产生的新文件,只能读取一次。
 *
 * 必须将文件从同一文件系统中的另一个位置“移动”到被监视的目录中
 */
object Streaming_04_ReadLocalFile {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        /**
         * textFileStream(directory: String)
         */
        val dStream: DStream[String] = ssc.textFileStream("D:/data")
        dStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print(200)

        ssc.start()
        ssc.awaitTermination()
    }
}

使用java程序,利用IO产生一个新文件

package com.qf.sparkStreaming.day01;

import java.io.*;

public class IOTest {
    
    
    public static void main(String[] args) throws Exception {
    
    
        FileOutputStream fis = new FileOutputStream("D:/data/newFile3.txt");
        PrintWriter pw = new PrintWriter(new OutputStreamWriter(fis,"utf-8"));
        String context = "hello world hello The world puts off its mask of vastness to its lover It becomes small as one song, as one kiss of the eternal " +
                "It is the tears of the earth that keep here smiles in bloom The mighty desert is burning for the love of a blade of grass who shakes her head and laughs and flies away";
        pw.println(context);
        pw.println(context);
        /**
         * 加了一个睡眠,sparkstreaming就读不到数据了,为什么?
         * 
         *  默认情况,close才会触发缓存中的数据flush到磁盘。
         *     所以睡眠时,文件名产生了,但是文件里没有数据。
         *     所以,sparkstreaming读一次的时候没有读到数据。
         */
        Thread.sleep(11000);
        pw.println(context);
        pw.close();

    }
}

2.5 读取HDFS文件

package com.qf.sparkstreaming.day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 *
 * 读取hdfs上的文件:  必须将文件从同一文件系统中的另一个位置“移动”到被监视的目录中
 *
 *     [root@qianfeng01 ~]# hdfs dfs -put emp.txt /input/emp1.txt
 *     [root@qianfeng01 ~]# hdfs dfs -copyFromLocal emp.txt /input/emp2.txt
 *     [root@qianfeng01 ~]# hdfs dfs -moveFromLocal emp.txt /input/emp3.txt
 */
object Streaming_05_readHDFSFile {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))

        /**
         * textFileStream(directory: String)
         */
        val dStream: DStream[String] = ssc.textFileStream("hdfs://qianfeng01:8020/input")
        dStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print(200)

        ssc.start()
        ssc.awaitTermination()
    }
}

三、SparkStreaming与Kafka的整合

3.1 简介

在实际生产环境中,kafka用的比较多,用于消息缓存,而SparkStreaming是一个准实时计算框架,所以两者的结合在企业中的用的相对较多。

两者的整合有两个版本,一个是0-8(低版本),一个是0-10(新版本)

注意区别就是下面的三个SSL、Offset Commit API Dynamic Topic Subscription

在这里插入图片描述

3.2 两个版本的原理图解析

1) 0-8的原理解析图

在这里插入图片描述

2)0-10的原理解析图

在这里插入图片描述

3.3 SparkStreaming消费Kafka

package com.qf.sparkstreaming.day01

import org.apache.kafka.clients.consumer.{
    
    ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{
    
    DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{
    
    ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 *  使用0-10版本的整合包里的api进行读取数据
 */
object Streaming_06_FromKafka {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FromQueue")
        val ssc:StreamingContext = new StreamingContext(conf,Durations.seconds(10))


        var params = Map[String,String](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"qianfeng01:9092,qianfeng02:9092,qianfen03:9092",
            ConsumerConfig.GROUP_ID_CONFIG->"g1",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer",
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"
        )

        /**
         * 使用0-10的整合API   KafkaUtils.createDirectStream(
         * sc:StreamingContext,    :   上下文对象
         * locationStrategy:LocationStrategy,  : 位置策略,经常使用的是LocationStrategies.PrePreferConsistent   该策略指的是spark的RDD的一个分区对应kafka的一个分区
         * consumerStrategy: ConsumerStrategy[K, V],  : 消费者策略,用于订阅主题的等
         * .....
         * )
         */
        val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](List("pet"), params))

        dStream.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print(200)

        ssc.start()
        ssc.awaitTermination()
    }
}

3.4 维护offset到zookeeper上

package com.qf.sparkstreaming.day02

import com.qf.sparkstreaming.day01.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{
    
    ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 *   1. sparkStreaming作为消费者,取消费Kafka中的某些主题的分区中的数据。
 *   2. 对于消费后的偏移量,我们维护到zookeeper上。
 *
 *   注意:消费者第一次消费数据时,zk上应该没有offset的维护
 *        第一次消费后,要讲读取到的偏移量维护到zk上,方便下次消费。
 */
object Streaming_01_offsetToZookeeper {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        //获取消费者连接kafka的各种参数
        val params = MyKafkaUtils.getParamToMap()
        //sparkstreaming要消费的主题
        val topics = Array("pet")
        /**
         *  从zk上获取offset
         *
         *  如何在zk上维护offset?
         *      维护的znode路径如 :/kafka/offsets/groupid/topic/partition
         *      offset保存到partition的里面。
         *
         *  因此,在读取zk上维护的offset时,要指定相应的参数,比如 groupid,
         *
         */
        val offsets:Map[TopicPartition,Long] = MyZkUtils.getOffset(params.getOrElse("group.id","g1"),topics)
        print(offsets.size)
        var dStream:InputDStream[ConsumerRecord[String, String]] = null
        if(offsets.size>0) {
    
    
            //不是第一次读取Kafka上的数据,而是其他时候,比如宕机并恢复后,应该从zk上保存的offset开始读取
            dStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params,offsets)
            )
        }else{
    
    
            //第一次读取Kafka上的数据
            dStream= KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params)
            )
        }

        /**
         *  消费数据的打印, 消费完数据后,要讲offset维护到zk上
         *  RDD的泛型:RDD[ConsumerRecord[String,String]]
         *      说明RDD里存储了一堆消息记录
         *
         *
         *
         *  OffsetRange :  该对象记录的是 消费的某一个分区的偏移量的范围,有以下属性
         *        val topic: String
         *        val partition: Int,
         *        val fromOffset: Long,   刚消费时的该分区的偏移量
         *        val untilOffset: Long   消费完后的偏移量+1的数字,表示下一次消费从untilOffset值开始消费
         */

        dStream.foreachRDD(rdd=>{
    
    
            rdd.foreach(println)
            //将当前数据流中的最后一条记录的offset维护到zk上
            val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            //将每个分区的untilOffset保存到zk上
            MyZkUtils.updateOffset(params.getOrElse("group.id","g1"),ranges)
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

MyZkUtils的编写

package com.qf.sparkstreaming.day02

import org.apache.curator.RetryPolicy
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange

import java.util

object MyZkUtils {
    
    
    /**
     * 将消费后的偏移量保存到zk上
     * @param groupid
     * @param ranges
     *
     * OffsetRange对象上的属性:topic,partition,fromOffset,untilOffset
     */
    def updateOffset(groupid: String, ranges: Array[OffsetRange]): Unit = {
    
    
        for(range <- ranges){
    
    
            val untilOffset: Long = range.untilOffset
            val partition =range.partition
            val topic = range.topic
            //保存到相应的路径里: /kafka/offsets/groudid/topic/partition
            checkPath(s"$basePath/$groupid/$topic/$partition")
            zkClient.setData().forPath(s"$basePath/$groupid/$topic/$partition",untilOffset.toString.getBytes())
        }
    }


    val zkClient = {
    
    
        //获取连接zk的客户端api
        val zkClient = CuratorFrameworkFactory
          .builder()
          .connectString("qianfeng01:2181,qianfeng02:2181,qianfeng03:2181")
          .retryPolicy(new ExponentialBackoffRetry(5000,6))
          .build()
        zkClient.start()
        zkClient
    }
    val basePath = "/kafka/offsets"

    /**
     * 检查路径是否存在,不存在就创建
     * @param path
     * @return
     */
    def checkPath(path: String) = {
    
    
        if(zkClient.checkExists().forPath(path)==null){
    
    
            //创建znode,递归创建
            zkClient.create().creatingParentsIfNeeded().forPath(path)
        }
    }

    /**
     * 获取zk上某一个消费者组下的某些主题的分区里的offset
     *
     * @param groupid     消费者组
     * @param topics      消费的主题集合
     * @return
     *
     *
     *  维护的znode路径如 :/kafka/offsets/groupid/topic/partition
     */
    def getOffset(groupid: String, topics: Array[String]): Map[TopicPartition, Long] = {
    
    
        //创建一个map对象,用于存储每个分区和相应的偏移量
        var offsets = Map[TopicPartition, Long]()

        //遍历每一个主题
        for(topic <- topics){
    
    
            val path = s"$basePath/$groupid/$topic"
            checkPath(path) //执行完这一步,路径一定存在
            //获取主题znode下的所有分区znode
            val partitionsZnodes: util.List[String] = zkClient.getChildren.forPath(path)
            //遍历每一个分区,  注意:如果主题下没有分区,表示还没有维护offset到zk上,循环进不去
            import  scala.collection.JavaConversions._
            for(partitionZnode <- partitionsZnodes){
    
    
                //获取partition里存储的offset
                val bytes: Array[Byte] = zkClient.getData.forPath(s"$path/$partitionZnode")
                val offset = new String(bytes).toLong
                offsets += (new TopicPartition(topic,partitionZnode.toInt)->offset)
            }
        }

        offsets
    }


    def main(args: Array[String]): Unit = {
    
    
        zkClient.create().forPath("/names")
    }
}

3.5 维护offset到redis上

package com.qf.sparkstreaming.day02

import com.qf.sparkstreaming.day01.MyKafkaUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{
    
    ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 * 将sparkStreaming作为消费者消费消息的offset保存到redis中
 *     思考:使用redis的哪一种类型来保存offset
 *           使用hash类型来保存这些数据:
 *              key            field         字段值
 *              groupid     topic#partition   offset
 */
object Streaming_02_offsetToRedis {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        // 消费者的序列化:需要Kryo序列化机制,但是配置文件里可能不生效,那么就写入程序中,如下即可
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        val topics = Array("pet")
        val params = MyKafkaUtils.getParamToMap()
        //从redis中获取偏移量
        val offsets:Map[TopicPartition,Long] = MyRedisUtils.getOffset(params.getOrElse("group.id","g1"),topics)

        var dStream:InputDStream[ConsumerRecord[String, String]] = null
        if(offsets.size>0) {
    
    
            //不是第一次读取Kafka上的数据,而是其他时候,比如宕机并恢复后,应该从zk上保存的offset开始读取
            dStream = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params,offsets)
            )
        }else{
    
    
            //第一次读取Kafka上的数据
            dStream= KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](topics, params)
            )
        }

        dStream.foreachRDD(rdd => {
    
    
            rdd.foreach(println)
            //从rdd上获取每个分区的消费的偏移量的范围
            val arr: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            //更新到redis上  :  topic,parition,fromOffset,untilOffset
            MyRedisUtils.updateOffset(params.getOrElse("group.id","g1"),arr)
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

MyRedisUtils的编写

package com.qf.sparkstreaming.day02

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{
    
    HostAndPort, JedisCluster}

import java.util

object MyRedisUtils {
    
    
    //将偏移量保存到redis对应的hash中
    def updateOffset(groupid: String, arr: Array[OffsetRange]): Unit = {
    
    
        for(offsetRange <- arr){
    
    
            //获取相应的属性
            val untiloffset = offsetRange.untilOffset
            val topic: String = offsetRange.topic
            val partition: Int = offsetRange.partition

            redisCluster.hset(groupid,topic+"#"+partition,untiloffset.toString)
        }
    }

    //获取redis集群客户端api
    val redisCluster = {
    
    
        val sets = new util.HashSet[HostAndPort]()
        //添加集群的master节点
        sets.add(new HostAndPort("qianfeng01",7001))
        sets.add(new HostAndPort("qianfeng01",7002))
        sets.add(new HostAndPort("qianfeng01",7003))
        val redisCluster = new JedisCluster(sets)
        redisCluster
    }

    /**
     * 读取偏移
     * @param groupid
     * @param topics
     * @return
     *              key            field         字段值
     *              groupid     topic#partition   offset
     */
    def getOffset(groupid: String, topics: Array[String]): Map[TopicPartition, Long] = {
    
    
        var offsets = Map[TopicPartition,Long]()
        //使用redis客户端读取hash类型
        val stringToString: util.Map[String, String] = redisCluster.hgetAll(groupid)
        //获取所有的字段
        val fields: util.Set[String] = stringToString.keySet()
        import  scala.collection.JavaConversions._
        for(field<-fields){
    
    
            //通过字段取偏移量
            val offset: Long = stringToString.get(field).toLong
            //解析字段的格式:  topic#partition
            val arr: Array[String] = field.split("#")
            offsets += (new TopicPartition(arr(0),arr(1).toInt)->offset)
        }
        offsets
    }



    def main(args: Array[String]): Unit = {
    
    
        val str: String = redisCluster.get("k1001")
        println(str)
    }
}

3.6 总结

0-8与0-10的总结

1)简化的并行性

两个都不需要创建多个输入Kafka流,手动来合并。 在使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。

2)效率

0-8方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次:一次是Kafka,另一次是由预先写入日志(WriteAhead Log)复制。
0-10这种方式消除了0-8的问题,因为没有接收器,因此不需要预先写入日志。只要Kafka数据保留时间足够长。

3)Exactly-once

0-8使用Kafka的高级API来在Zookeeper中存储消费的偏移量,也就是利用了低版本的Kafka消费数据的保存方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。   也就是说0-8 没有保证Exactly-once

0-10 我们不再Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的情况下有效地接收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保证结果和偏移量的原子事务。

四、SparkStreaming的常用算子

4.1 算子的分类

# SparkStreaming的算子分为两类,
	-- 一类是Transformations算子,转换算子又可从状态上来分类:
		 --有状态算子:可以累加前n次的计算结果的算子
		 --无状态算子:普通的算子都是无状态算子
	--一类是Output Operations算子

# 注意:
	SparkStreaming的程序中,如果没有Output Operations算子,会报错

1) 常见的转换算子

转换算子:可以将上游的DStream转换成另一种DStream。 DStream的许多转换算子都是和RDD的转换算子一样。

在这里插入图片描述

常见算子的练习

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

object Streaming_03_tansfromitions {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        println("---------map: 对上游的DStream中每一个元素做映射处理,返回一个新的DStream---------")
        //dStream.map((_,1)).print()  //  将每一个元素 与1 映射成一个元组。

        println("---------flatMap: 将元素展开,并压平---------")
        //dStream.flatMap(_.split(" ")).print()

        println("---------filter: 对DStream中的数据进行筛选过滤,返回true的元素,构成一个新的DStream---------")
        //dStream.filter(elem => {elem.toInt> 10}).print()
        println("---------repartition: 对DStream进行重新分区,返回新的DStream---------")
        //dStream.repartition(4).print()
        println("---------count: 对DStream里的元素进行计算统计,返回一个新的只有一个元素的DStream---------")
        //dStream.count().print()
        println("---------union: 两个DStream进行联合---------")
        //dStream.union(dStream).print()
        println("---------reduce: DStream里的元素进行规约计算---------")
//        dStream.reduce((m,n)=>{
    
    
//            var sum = m.toInt
//            sum+=n.toInt
//            sum.toString
//        }).print()
        println("---------countByValue: 通过value进行分组,然后统计每种value的个数---------")
//        dStream.countByValue().print()
        println("---------reduceByKey: 通过key进行分组,然后对每一组中的value进行规约计算---------")
        //dStream.map((_,1)).reduceByKey((m,n)=>{ m+n}).print()
        println("---------join: 两个DStream进行关联,返回的是一个两个元素的元组,第二个元素还是一个元组---------")
//        val d2: DStream[(String, Int)] = dStream.map((_, 1))
//        d2.join(d2).print()
        println("---------cogroup: 两个DStream进行重组: 返回的是一个两个元素的元组,第二个元素还是一个元组,只不过第二个元组的每个元素是一个序列---------")
        val d2: DStream[(String, Int)] = dStream.map((_, 1))
        d2.cogroup(d2).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

transform算子的练习

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}

/**
 * transform算子的练习
 */
object Streaming_04_transform_transform {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        /**
         * transform的作用:就是将DStream中的数据转成RDD,进行操作,然后返回RDD,  sparkStream会再次将返回的RDD
         * 进行包装成DStream。
         *
         * 为什么有这样的一个操作?
         * 原因是:DStream中的算子较RDD少很多。有transform算子后,就可以直接操作RDD了。
         */
        println("@@@@@@@@")   // 该代码是在driver端执行的
//        dStream.transform(rdd=>{
    
    
//            println("########")  // 该代码是在driver端执行, 该处通常用于进行每次的微小的批次计算的初始化操作。
//            rdd.map(line=>{
    
    
//                println("-----------") // 该代码是在executor端执行
//                line
//            })
//
//        }).print()

        val d2: DStream[Int] = dStream.transform(rdd => {
    
    
            val result: Int =
                rdd.aggregate(10)(
                    (x, y) => {
    
    
                        math.max(x, y.toInt)
                    }, //分区内取较大值,与默认值10做比较
                    (m, n) => m + n) //分区间求和

            ssc.sparkContext.makeRDD(List(result) //包装成RDD

            )
        })
        d2.print()


        ssc.start()
        ssc.awaitTermination()
    }
}

updateStateByKey算子的练习

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 *  需求:
 *       准实时计算: 10秒一计算
 *            客户的需求时,求30秒之内的单词统计|热搜|排名榜
 */
object Streaming_05_transform_updateStateByKey {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))
        ssc.checkpoint("data")

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        /**
         * updateStateByKey:   是一个有状态维护的转换算子。
         *
         * 参数: updateFunc: (Seq[V], Option[S]) => Option[S]
         *       函数的第一个参数:表示当前批次的同一个Key的所有value的集合    数据如右:hello,1, hello,1 hello,1  那么seq[v]就是List(1,1,1)
         *       函数的第二个参数:就是状态数据,也就是同一个key的之前的所有批次的计算结果。
         *
         *       假如这是第一个批次,那么第二个参数是没有数据的,因此sparkStreaming要求第二个参数类型是option,因为可能拿不到值。
         */
        dStream.map((_,1)).updateStateByKey((seq,ops:Option[Int])=>{
    
    
            //定义一个变量sum,用于统计当前匹配的同一个key的所有的1的和
            var sum = 0
            for(num <- seq){
    
    
                sum += num
            }
            //再将当前批次的计算结果,累加到之前的状态数据中
            val result = ops.getOrElse(0)+sum
            Option(result)
        }).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

2)常见的输出算子

在sparkStreaming中,除了转换算子外,就是输出算子(不叫行动算子)。 输出算子的作用,就是将数据流中的数据保存到外部系统,比如数据库或者是文件系统,实际上底层,就是调用了rdd的行动算子。

在这里插入图片描述

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{
    
    Durations, StreamingContext}

/**
 * sparkStreaming的输出算子:
 *    print()
 *    saveXXXXX()
 *    foreachRDD()
 */
object Streaming_06_outputop_foreachRDD {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("offsetToZookeeper")
        val ssc = new StreamingContext(conf,Durations.seconds(10))
        ssc.checkpoint("data")

        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        /**
         * 1. foreachRDD:是一个输出操作算子
         * 2. 作用:将DStream中的数据当成RDD进行遍历,此时的RDD的最终计算是要使用RDD的行动算子的。不能再返回RDD
         *    rdd的最终计算其实就是打印,保存等操作
         *
         *    foreachRDD算子中,没有时间戳。
         *
         *   注意:默认保存的时候,会将上一个批次保存的数据覆盖
         */
        dStream.foreachRDD(rdd=>{
    
    
            println("-----")
            rdd.saveAsTextFile("output1")
            }
           )

        ssc.start()
        ssc.awaitTermination()
    }
}

4.2 窗口算子

4.2.1 window的介绍

1. window操作就是窗口函数,作用就是计算窗口里所包含的所有数据。

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,窗口对应的大小是3个RDD,3个RDD会被聚合起来进行处理,然后过了2个duration,窗口会继续滑动,又会对最近的3个RDD,也就是窗口所包含的数据进行计算。所以每个滑动窗口操作,都应该指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

在这里插入图片描述

在这里插入图片描述

4.2.2 窗口算子

在这里插入图片描述

4.2.3 案例演示

1)

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{
    
    Seconds, StreamingContext}

/**
 * 使用window算子,统计最近30秒内的单词频率
 */
object Streaming_07_WindowOperationDemo {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val  conf = new SparkConf().setAppName("Streaming_07_WindowOperationDemo").setMaster("local[*]")
        val  ssc = new StreamingContext(conf,Seconds(10))

        //从nc上读取数据
        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        val mapDStream: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_, 1))

        /**
         * window(
         *  windowDuration:Duration,    //第一个参数:用于指定窗口的大小,即长度,必须是micro-batch处理时的时间整数倍
         *  slideDuration:Duration      //第二个参数:用于指定窗口滑动的周期,必须是micro-batch处理时的时间整数倍
         *  )
         *
         *  数据的情况:
         *        1. 刚启动时,窗口里一定没有数据,如果近30秒内没有数据流,则窗口里也没有数据
         *        2. 启动程序不久,窗口的数据的变化应该是由少变多
         *        3. 在处理数据过程中,窗口的数据可能会由多变少,甚至没有
         */
        val windowDStream: DStream[(String, Int)] = mapDStream.window(Seconds(30), Seconds(10))
        val resultDStream: DStream[(String, Int)] = windowDStream.reduceByKey(_ + _)
        resultDStream.print()

        ssc.start()
        ssc.awaitTermination()
    }
}

2)

package com.qf.sparkstreaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{
    
    DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{
    
    Seconds, StreamingContext}

/**
 * 生产环境中的需求:统计最近1小时的所有产品的销量的排名top3
 *
 * 模拟:30秒内的数据
 *
 *  8:00      1001 毛衣 10
 *  8:00      1002 牙刷 1
 *  8:00      1003 手机 1
 *  8:00      1004 毛巾 8
 *  8:10      1001 毛衣 8
 *  8:10      1002 牙刷 10
 *  8:10      1003 手机 15
 *  8:10      1004 毛巾 1
 *  8:20      1001 毛衣 3
 *  8:20      1002 牙刷 2
 *  8:20      1003 手机 4
 *  8:20      1004 毛巾 10
 *  8:30      1001 毛衣 3
 *  8:30      1002 牙刷 22
 *  8:30      1003 手机 4
 *  8:30      1004 毛巾 10
 *  ........
 *
 *  计算时间片段:5秒一算
 *  窗口大小:30秒
 *  窗口滑动周期:5秒一滑动
 *
 *  分组统计销售量,降序,top3
 *
 *
 */
object Streaming_08_SalesRank {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val  conf = new SparkConf()
          .setAppName("_01FromKafkaCustomOffsetToRedis")
          .setMaster("local[*]")
        val  ssc = new StreamingContext(conf,Seconds(5))

        //从nc上读取数据
        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        //得到的是每一个产品id和sum
        val mapDStream: DStream[(String, Int)] = dStream.map(line => {
    
    
            val arr: Array[String] = line.split(" ")
            (arr(0), arr(2).toInt)
        })
        //规定窗口大小和滑动周期
        val windowDStream: DStream[(String, Int)] = mapDStream.window(Seconds(30), Seconds(5))
        // foreachRDD 是输出算子
        windowDStream.reduceByKey(_ + _).foreachRDD(

            rdd => {
    
    
                println("-------------------------")
                rdd.sortBy(t => t._2, false).take(3).foreach(println)
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}
/*机 4
 *  8:20      1004 毛巾 10
 *  8:30      1001 毛衣 3
 *  8:30      1002 牙刷 22
 *  8:30      1003 手机 4
 *  8:30      1004 毛巾 10
 *  ........
 *
 *  计算时间片段:5秒一算
 *  窗口大小:30秒
 *  窗口滑动周期:5秒一滑动
 *
 *  分组统计销售量,降序,top3
 *
 *
 */
object Streaming_08_SalesRank {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val  conf = new SparkConf()
          .setAppName("_01FromKafkaCustomOffsetToRedis")
          .setMaster("local[*]")
        val  ssc = new StreamingContext(conf,Seconds(5))

        //从nc上读取数据
        val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 10086)

        //得到的是每一个产品id和sum
        val mapDStream: DStream[(String, Int)] = dStream.map(line => {
    
    
            val arr: Array[String] = line.split(" ")
            (arr(0), arr(2).toInt)
        })
        //规定窗口大小和滑动周期
        val windowDStream: DStream[(String, Int)] = mapDStream.window(Seconds(30), Seconds(5))
        // foreachRDD 是输出算子
        windowDStream.reduceByKey(_ + _).foreachRDD(

            rdd => {
    
    
                println("-------------------------")
                rdd.sortBy(t => t._2, false).take(3).foreach(println)
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}

猜你喜欢

转载自blog.csdn.net/weixin_45682261/article/details/125144227