25.大数据学习之旅——Spark集群模式安装&Spark架构原理

版权声明:版权归零零天所有 https://blog.csdn.net/qq_39188039/article/details/86467502

实现步骤:
1)上传解压spark安装包
2)进入spark安装目录的conf目录
3)配置spark-env.sh文件
配置示例:
#本机ip地址
SPARK_LOCAL_IP=hadoop01
#spark的shuffle中间过程会产生一些临时文件,此项指定的是其存放目录,不配置默认是在
/tmp目录下
SPARK_LOCAL_DIRS=/home/software/spark/tmp
export JAVA_HOME=/home/software/jdk1.8
4)在conf目录下,编辑slaves文件
配置示例:
hadoop01
hadoop02
hadoop03
5)配置完后,将spark目录发送至其他节点,并更改对应的 SPARK_LOCAL_IP配置
启动集群
1)如果你想让01 虚拟机变为master节点,则进入01 的spark安装目录的sbin目录
执行: sh start-all.sh
2)通过jps查看各机器进程,
01:Master +Worker
02:Worker
03:Worker
3)通过浏览器访问管理界面
http://192.168.234.11:8080
在这里插入图片描述
4)通过spark shell 连接spark集群
进入spark的bin目录
执行:sh spark-shell.sh --master spark://192.168.234.11:7077
6)在集群中读取文件:
sc.textFile("/root/work/words.txt")
默认读取本机数据 这种方式需要在集群的每台机器上的对应位置上都一份该文件 浪费磁盘
7)所以应该通过hdfs存储数据
sc.textFile(“hdfs://hadoop01:9000/mydata/words.txt”);
注:可以在spark-env.sh 中配置选项 HADOOP_CONF_DIR 配置为hadoop的etc/hadoop的地址 使默认
访问的是hdfs的路径
注:如果修改默认地址是hdfs地址 则如果想要访问文件系统中的文件 需要指明协议为file 例如
sc.text(“file:///xxx/xx”)

集群模式运行WordCount
实现步骤
1)创建spark的项目
在scala中创建项目 导入spark相关的jar包
在这里插入图片描述
2)开发spark相关代码
代码示例:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCountDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster(“spark://hadoop01:7077”).setAppName(“wordcount”)
val sc=new SparkContext(conf)
val data=sc.textFile(“hdfs://hadoop01:9000/words.txt”, 2)
val result=data.flatMap { x => x.split(" ") }.map { x => (x,1) }.reduceByKey(+)
result.saveAsTextFile(“hdfs://hadoop01:9000/wcresult”)
}
}
3)将写好的项目打成jar,上传到服务器,进入bin目录
执行:spark-submit --class cn.tedu.WordCountDriver /home/software/spark/conf/wc.jar

Spark架构
概述
为了更好地理解调度,我们先来鸟瞰一下集群模式下的Spark程序运行架构图。
在这里插入图片描述

  1. Driver Program
    用户编写的Spark程序称为Driver Program。每个Driver程序包含一个代表集群环境的
    SparkContext对象,程序的执行从Driver程序开始,所有操作执行结束后回到Driver程序
    中,在Driver程序中结束。如果你是用spark shell,那么当你启动 Spark shell的时候,系统
    后台自启了一个 Spark 驱动器程序,就是在Spark shell 中预加载的一个叫作 sc 的
    SparkContext 对象。如果驱动器程序终止,那么Spark 应用也就结束了。
  2. SparkContext对象
    每个Driver Program里都有一个SparkContext对象,职责如下:
    1)SparkContext对象联系 cluster manager(集群管理器),让 cluster manager 为
    Worker Node分配CPU、内存等资源。此外, cluster manager会在 Worker Node 上启动
    一个执行器(专属于本驱动程序)。
    2)和Executor进程交互,负责任务的调度分配。
  3. cluster manager 集群管理器
    它对应的是Master进程。集群管理器负责集群的资源调度,比如为Worker Node分配CPU、
    内存等资源。并实时监控Worker的资源使用情况。一个Worker Node默认情况下分配一个
    Executor(进程)。
    从图中可以看到sc和Executor之间画了一根线条,这表明:程序运行时,sc是直接与
    Executor进行交互的。
    所以,cluster manager 只是负责资源的管理调度,而任务的分配和结果处理它不管。
    4.Worker Node
    Worker节点。集群上的计算节点,对应一台物理机器
    5.Worker进程
    它对应Worder进程,用于和Master进程交互,向Master注册和汇报自身节点的资源使用情
    况,并管理和启动Executor进程
    6.Executor
    负责运行Task计算任务,并将计算结果回传到Driver中。
    7.Task
    在执行器上执行的最小单元。比如RDD Transformation操作时对RDD内每个分区的计算都会对应一个Task。

Spark调度模块
概述
在这里插入图片描述
之前我们提到:Driver 的sc负责和Executor交互,完成任务的分配和调度,在底层,任务调
度模块主要包含两大部分:
1)DAGScheduler
2)TaskScheduler
它们负责将用户提交的计算任务按照DAG划分为不同的阶段并且将不同阶段的计算任务提交
到集群进行最终的计算。整个过程可以使用下图表示
在这里插入图片描述
RDD Objects可以理解为用户实际代码中创建的RDD,这些代码逻辑上组成了一个DAG。
DAGScheduler主要负责分析依赖关系,然后将DAG划分为不同的Stage(阶段),其中每个
Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数
据。
在DAGScheduler将这组Task划分完成后,会将这组Task提交到
TaskScheduler。TaskScheduler通过Cluster Manager申请计算资源,比如在集群中的某个
Worker Node上启动专属的Executor,并分配CPU、内存等资源。接下来,就是在Executor
中运行Task任务,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回
传到Driver或者保存在本地。

Scheduler的实现概述
在这里插入图片描述
任务调度模块涉及的最重要的三个类是:
1)org.apache.spark.scheduler.DAGScheduler 前面提到的DAGScheduler的实现。
将一个DAG划分为一个一个的Stage阶段(每个Stage是一组Task的集合)
然后把Task Set 交给TaskScheduler模块。
2)org.apache.spark.scheduler.TaskScheduler
它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接收不同Stage的任
务。向Cluster Manager 申请资源。然后Cluster Manager收到资源请求之后,在Worker为
其启动进程
3)org.apache.spark.scheduler.SchedulerBackend

是一个trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的Task分配计
算资源(即Executor),并且在分配的Executor上启动Task,完成计算的调度过程。
4)AKKA是一个网络通信框架,类似于Netty,此框架在Spark1.8之后已全部替换成Netty
任务调度流程图
在这里插入图片描述

Spark Shuffle详解
概述
Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚
(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。
以最简单的Word Count为例,其中数据保存在Node1、Node2和Node3;
经过处理后,这些数据最终会汇聚到Nodea、Nodeb处理,如下图所示。
在这里插入图片描述
这个数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂:
1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。
2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内
存,因此这个过程中可能会发生多次硬盘续写。
3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间
做一个比较好的选择?
4)数据需要通过网络传输,因此数据的序列化和反序列化也变得相对复杂。
一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以
做到在内存中计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的
全部RDD,因此有必要持久化这个中间结果。所以这就是为什么Shuffle过程会产生文件的原因。
如果Shuffle过程不落地,①可能会造成内存溢出 ②当某分区丢失时,会重新计算所有父分区数据

Shuffle Write
Shuffle Write,即数据是如何持久化到文件中,以使得下游的Task可以获取到其需要处理的数据的(即
Shuffle Read)。在Spark 0.8之前,Shuffle Write是持久化到缓存的,但后来发现实际应用中,shuffle过程带
来的数据通常是巨量的,所以经常会发生内存溢出的情况,所以在Spark 0.8以后,Shuffle Write会将数据持久化
到硬盘,再之后Shuffle Write不断进行演进优化,但是数据落地到本地文件系统的实现并没有改变。
1)Hash Based Shuffle Write
在Spark 1.0以前,Spark只支持Hash Based Shuffle。因为在很多运算场景中并不需要排序,因此多余的排序只
能使性能变差,比如Hadoop的Map Reduce就是这么实现的,也就是Reducer拿到的数据都是已经排好序的。
实际上Spark的实现很简单:每个Shuffle Map Task根据key的哈希值,计算出每个key需要写入的Partition然后
将数据单独写入一个文件,这个Partition实际上就对应了下游的一个Shuffle Map Task或者Result Task。因此
下游的Task在计算时会通过网络(如果该Task与上游的Shuffle Map Task运行在同一个节点上,那么此时就是一
个本地的硬盘读写)读取这个文件并进行计算。
在这里插入图片描述
Hash Based Shuffle Write存在的问题
由于每个Shuffle Map Task需要为每个下游的Task创建一个单独的文件,因此文件的数量就是:
number(shuffle_map_task)number(result_task)。
如果Shuffle Map Task是1000,下游的Task是500,那么理论上会产生500000个文件(对于size为0的文件
Spark有特殊的处理)。生产环境中Task的数量实际上会更多,因此这个简单的实现会带来以下问题:
1)每个节点可能会同时打开多个文件,每次打开文件都会占用一定内存。假设每个Write Handler的默认需要
100KB的内存,那么同时打开这些文件需要50GB的内存,对于一个集群来说,还是有一定的压力的。尤其是如果
Shuffle Map Task和下游的Task同时增大10倍,那么整体的内存就增长到5TB。
2)从整体的角度来看,打开多个文件对于系统来说意味着随机读,尤其是每个文件比较小但是数量非常多的情
况。而现在机械硬盘在随机读方面的性能特别差,非常容易成为性能的瓶颈。如果集群依赖的是固态硬盘,也许
情况会改善很多,但是随机写的性能肯定不如顺序写的。
2)Sort Based Shuffle Write
在Spark 1.2.0中,Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,
即spark.shuffle.manager从Hash换成了Sort,对应的实现类分别是
org.apache.spark.shuffle.hash.HashShuffleManager和
org.apache.spark.shuffle.sort.SortShuffleManager。
那么Sort Based Shuffle“取代”Hash Based Shuffle作为默认选项的原因是什么?
正如前面提到的,Hash Based Shuffle的每个Mapper都需要为每个Reducer写一个文件,供Reducer读取,即
需要产生M
R个数量的文件,如果Mapper和Reducer的数量比较大,产生的文件数会非常多。
而Sort Based Shuffle的模式是:每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会
将所有的结果写到一个文件里,同时会生成一个Index文件,
在这里插入图片描述
Reducer可以通过这个Index文件取得它需要处理的数据。避免产生大量文件的直接收益就是节省了内存的使用和
顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个
文件给系统带来的压力。
Sort Based Write实现详解
Shuffle Map Task会按照key相对应的Partition ID进行Sort,其中属于同一个Partition的key不会Sort。因为
对于不需要Sort的操作来说,这个Sort是负收益的;要知道之前Spark刚开始使用Hash Based的Shuffle而不是
Sort Based就是为了避免Hadoop Map Reduce对于所有计算都会Sort的性能损耗。对于那些需要Sort的运算,
比如sortByKey,这个Sort在Spark 1.2.0里还是由Reducer完成的。
①答出shuffle的定义
②spark shuffle的特点
③spark shuffle的目的
④spark shuffel的实现类,即对应优缺点

Shuffle 相关参数配置
概述
Shuffle是Spark Core比较复杂的模块,它也是非常影响性能的操作之一。因此,在这里整理
了会影响Shuffle性能的各项配置。
1)spark.shuffle.manager
Spark 1.2.0官方版本支持两种方式的Shuffle,即Hash Based Shuffle和
Sort Based Shuffle。其中在Spark 1.0之前仅支持Hash Based Shuffle。Spark 1.1引入了
Sort Based Shuffle。Spark 1.2的默认Shuffle机制从Hash变成了Sort。如果需要
Hash Based Shuffle,只需将spark.shuffle.manager设置成“hash”即可。
配置方式:
①进入spark安装目录的conf目录
②cp spark-defaults.conf.template spark-defaults.conf
③spark.shuffle.manager=hash
应用场景:当产生的临时文件不是很多时,性能可能会比sort shuffle要好。
如果对性能有比较苛刻的要求,那么就要理解这两种不同的Shuffle机制的原理,结合具体的
应用场景进行选择。
对于不需要进行排序且Shuffle产生的文件数量不是特别多时,Hash Based Shuffle可能是更
好的选择;因为Sort Based Shuffle会按照Reducer的Partition进行排序。
而Sort Based Shuffle的优势就在于可扩展性,它的出现实际上很大程度上是解决
Hash Based Shuffle的可扩展性的问题。由于Sort Based Shuffle还在不断地演进中,因此它
的性能会得到不断改善。
对于选择哪种Shuffle,如果性能要求苛刻,最好还是通过实际测试后再做决定。不过选择默
认的Sort,可以满足大部分的场景需要。
2)spark.shuffle.spill
这个参数的默认值是true,用于指定Shuffle过程中如果内存中的数据超过阈值(参考
spark.shuffle.memoryFraction的设置)时是否需要将部分数据临时写入外部存储。如果设
置为false,那么这个过程就会一直使用内存,会有内存溢出的风险。因此只有在确定内存足
够使用时,
才可以将这个选项设置为false。
3)spark.shuffle.memoryFraction
在启用spark.shuffle.spill的情况下,spark.shuffle.memoryFraction决定了当Shuffle过程中
使用的内存达到总内存多少比例的时候开始spill。在Spark 1.2.0里,这个值是0.2。
此参数可以适当调大,可以控制在0.4~0.6。
通过这个参数可以设置Shuffle过程占用内存的大小,它直接影响了写入到外部存储的频率和
垃圾回收的频率。可以适当调大此值,可以减少磁盘I/O次数。
4)spark.shuffle.blockTransferService
在Spark 1.2.0中这个配置的默认值是netty,而在之前的版本中是nio。它主要是用于在各个
Executor之间传输Shuffle数据。netty的实现更加简洁,但实际上用户不用太关心这个选项。
除非有特殊需求,否则采用默认配置即可。
5)spark.shuffle.consolidateFiles
这个配置的默认值是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问
题。
如果配置选项为true,那么对于同一个Core上运行的Shuffle Map Task
不会产生一个新的Shuffle文件而是重用原来的。
此参数开启后,服务器有几核,就会生成几个shuffle临时文件,和MapTask和ResultTask无

当时官方给出建议:不太稳定,不建议使用
6)spark.shuffle.compress和spark.shuffle.spill.compress
这两个参数的默认配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都
是用来设置Shuffle过程中是否对Shuffle数据进行压缩。其中,前者针对最终写入本地文件系
统的输出文件;后者针对在处理过程需要写入到外部存储的中间数据,即针对最终的shuffle
输出文件。

  1. 设置spark.shuffle.compress
    需要评估压缩解压时间带来的时间消耗和因为数据压缩带来的时间节省。如果网络成为瓶颈,
    比如集群普遍使用的是千兆网络,那么将这个选项设置为true可能更合理;如果计算是CPU密
    集型的,那么将这个选项设置为false可能更好。
  2. 设置spark.shuffle.spill.compress
    如果设置为true,代表处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回
    进行merge的时候,要进行解压。因此要综合考虑CPU由于引入压缩、解压的消耗时间和
    Disk IO因为压缩带来的节省时间的比较。在Disk IO成为瓶颈的场景下,设置为true可能比较
    合适;如果本地硬盘是SSD,那么设置为false可能比较合适。
    7)spark.reducer.maxMbInFlight
    这个参数用于限制一个Result Task向其他的Executor请求Shuffle数据时所占用的最大内存
    数,默认是64MB。尤其是如果网卡是千兆和千兆以下的网卡时。默认值是 设置这个值需要
    综合考虑网卡带宽和内存。
    RDD容错机制
    概述
    分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是
    分布式系统的一个重要能力。
    Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以
    通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其
    他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做
    检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。
    RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在
    Spark里面叫lineage(血缘关系)。当一个RDD的某个分区丢失时,RDD是有足够的信息记录其如何通过其他
    RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。
    RDD的缓存
    概述
    相比Hadoop MapReduce来说,Spark计算具有巨大的性能优势,其中很大一部分原因是
    Spark对于内存的充分利用,以及提供的缓存机制。
    RDD持久化(缓存)
    持久化在早期被称作缓存(cache),但缓存一般指将内容放在内存中。虽然持久化操作在绝
    大部分情况下都是将RDD缓存在内存中,但一般都会在内存不够时用磁盘顶上去(比操作系
    统默认的磁盘交换性能高很多)。当然,也可以选择不使用内存,而是仅仅保存到磁盘中。所
    以,现在Spark使用持久化(persistence)这一更广泛的名称。
    如果一个RDD不止一次被用到,那么就可以持久化它,这样可以大幅提升程序的性能,甚至
    达10倍以上。
    默认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化操作避
    免了这里的重复计算,实际测试也显示持久化对性能提升明显,这也是Spark刚出现时被人称
    为内存计算框架的原因。
    假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中
    了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0
    →RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度
    可以得到很大提升。
    在这里插入图片描述
    持久化的方法是调用persist()函数,除了持久化至内存中,还可以在persist()中指定
    storage level参数使用其他的类型,具体如下:
    1)MEMORY_ONLY : 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空
    间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算. 这是默认的
    级别。
    cache()方法对应的级别就是MEMORY_ONLY级别
    2)MEMORY_AND_DISK:将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。
    如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
    3)MEMORY_ONLY_SER :将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一
    个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast
    serialize时会节省更多的空间,但是在读取时会使得 CPU 的 read 变得更加密集。如果内存
    空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。
    4)MEMORY_AND_DISK_SER :类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储
    到磁盘,而不是在用到它们时重新计算。如果内存空间不够,将未缓存的数据分区存储到磁
    盘,在需要使用这些分区时从磁盘读取。
    5)DISK_ONLY:只在磁盘上缓存 RDD。
    6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :与上面的级别功能相同,
    只不过每个分区在集群中两个节点上建立副本。
    7)OFF_HEAP 将数据存储在 off-heap memory 中。使用堆外内存,这是Java虚拟机里面
    的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操
    作系统管理(而不是虚拟机)。使用堆外内存的好处:可能会利用到更大的内存存储空间。但
    是对于数据的垃圾回收会有影响,需要程序员来处理
    注意,可能带来一些GC回收问题。
    Spark 也会自动持久化一些在 shuffle 操作过程中产生的临时数据(比如 reduceByKey),
    即便是用户并没有调用持久化的方法。这样做可以避免当 shuffle 阶段时如果一个节点挂掉了
    就得重新计算整个数据的问题。如果用户打算多次重复使用这些数据,我们仍然建议用户自己
    调用持久化方法对数据进行持久化。
    使用缓存
    scala> import org.apache.spark.storage._
    scala> val rdd1=sc.makeRDD(1 to 5)
    scala> rdd1.cache //cache只有一种默认的缓存级别,即MEMORY_ONLY
    scala> rdd1.persist(StorageLevel.MEMORY_ONLY)
    缓存数据的清除
    Spark 会自动监控每个节点上的缓存数据,然后使用 least-recently-used (LRU) 机制来处理
    旧的缓存数据。如果你想手动清理这些缓存的 RDD 数据而不是去等待它们被自动清理掉,
    可以使用 RDD.unpersist( ) 方法。

Checkpoint机制
概述
checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务
器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失
了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计
算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的
这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,
其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用
的地方
代码示例:
object Driver2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster(“local”).setAppName(“wordcount”)
val sc=new SparkContext(conf)
sc.setCheckpointDir(“hdfs://hadoop01:9000/check01”)
val data=sc.textFile(“d://data/word.txt”)
data.cache()
data.checkpoint()
val wordcount=data.flatMap {.split(" ")}.map {(,1)}.reduceByKey(+)
wordcount.cache()
wordcount.checkpoint()
wordcount.foreach{println}
}
}
总结:Spark的CheckPoint机制很重要,也很常用,尤其在机器学习中的一些迭代算法中很常
见。比如一个算法迭代10000次,如果不适用缓冲机制,如果某分区数据丢失,会导致整个计算

链重新计算,所以引入缓存机制。但是光引入缓存,也不完全可靠,比如缓存丢失或缓存存储不
下,也会导致重新计算,所以使用CheckPoint机制再做一层保证。
补充:检查目录的路径,一般都是设置到HDFS上
Spark懒执行的意义
Spark中,Transformation方法都是懒操作方法,比如map,flatMap,reduceByKey等。当触发某
个Action操作时才真正执行。
懒操作的意义:①不运行job就触发计算,避免了大量的无意义的计算,即避免了大量的无意义的
中间结果的产生,即避免产生无意义的磁盘I/O及网络传输
②更深层次的意义在于,执行运算时,看到之前的计算操作越多,执行优化的可能性就越高

扫描二维码关注公众号,回复: 5003691 查看本文章

Spark共享变量
概述
Spark程序的大部分操作都是RDD操作,通过传入函数给RDD操作函数来计算。这些函数在
不同的节点上并发执行,但每个内部的变量有不同的作用域,不能相互访问,所以有时会不太
方便,Spark提供了两类共享变量供编程使用——广播变量和计数器。

  1. 广播变量
    这是一个只读对象,在所有节点上都有一份缓存,创建方法是SparkContext.broadcast(),
    比如:
    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
    scala> broadcastVar.value
    res0: Array[Int] = Array(1, 2, 3)
    注意,广播变量是只读的,所以创建之后再更新它的值是没有意义的,一般用val修饰符来定
    义广播变量。
  2. 计数器
    计数器只能增加,是共享变量,用于计数或求和。
    计数器变量的创建方法是SparkContext.accumulator(v, name),其中v是初始值,name是
    名称。
    示例如下:
    scala> val accum = sc.accumulator(0, “My Accumulator”)
    accum: org.apache.spark.Accumulator[Int] = 0
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    scala> accum.value
    res1: Int = 10

spark解决数据倾斜问题
文件1
id name
1 tom
2 rose
文件2
id school sno
1
s1 211
2
s2 222
3
s3 233
4
s2 244
期望得到的数据 :
1 tom s1
2 rose s2
将少量的数据转化为Map进行广播,广播会将此 Map 发送到每个节点中,如果不进行广播,
每个task执行时都会去获取该Map数据,造成了性能浪费。
完整代码
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object joinTest extends App{
val conf = new SparkConf().setMaster(“local[2]”).setAppName(“test”)
val sc = new SparkContext(conf)
/**

  • map-side-join
  • 取出小表中出现的用户与大表关联后取出所需要的信息
  • /
    val people_info = sc.parallelize(Array((“1”,“tom”),(“2”,“rose”))).collectAsMap()
    val student_all = sc.parallelize(Array((“1”,“s1”,“211”),
    (“1”,“s2”,“222”),
    (“1”,“s3”,“233”),
    (“1”,“s2”,“244”)))
    //将需要关联的小表进行广播
    val people_bc = sc.broadcast(people_info)
    /
    *
  • 使用mapPartition而不是用map,减少创建broadCastMap.value的空间消耗
  • 同时匹配不到的数据也不需要返回()
  • */
    val res = student_all.mapPartitions(iter =>{
    //获取小表的数据
    val stuMap = people_bc.value
    val arrayBuffer = ArrayBuffer(String,String,String)
    //做两表的操作
    iter.foreach{case (idCard,school,sno) =>{
    if(stuMap.contains(idCard)){
    arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
    }
    }}
    arrayBuffer.iterator
    })

上一篇 24.大数据学习之旅——spark手把手带你入门

猜你喜欢

转载自blog.csdn.net/qq_39188039/article/details/86467502