2. RDD编程

2.1 编程模型 

      在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换

      要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务

      

      

2.2 创建RDD

      在Spark中创建RDD的创建方式大概可以分为三种:

      1、从集合中创建RDD;

      2、从外部存储创建RDD;

      3、从其他RDD创建

      1) 由一个已经存在的Scala集合创建,集合并行化

        val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

        而从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD。我们可以先看看这两个函数的声明:

扫描二维码关注公众号,回复: 6702567 查看本文章
def parallelize[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T]

def makeRDD[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T]
  

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

      从上面可以看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现,来看看在Spark中是怎么实现这个makeRDD函数的:

def makeRDD[T: ClassTag]( 
	seq: Seq[T], 
	numSlices: Int = defaultParallelism): RDD[T] = withScope { 
	    parallelize(seq, numSlices) 
} 

      我们可以看出,这个makeRDD函数完全和parallelize函数一致。但是我们得看看第二种makeRDD函数的函数实现,它接收的参数类型是Seq[(T,Seq[String])],Spark文档的说明是: 

      Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item. 

      这个函数还为数据提供了位置信息,看看如何使用:

scala> val boke01= sc.parallelize(List(1,2,3))
boke01: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21 

scala> val boke02 = sc.makeRDD(List(1,2,3))
boke02: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21 

scala> val seq = List((1, List("slave01")), | (2, List("slave02"))) 
seq: List[(Int, List[String])] = List((1,List(slave01)), (2,List(slave02))) 

scala> val boke03 = sc.makeRDD(seq)
boke03: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23 

scala> boke03.preferredLocations(boke03.partitions(1)) res26: Seq[String] = List(slave02) 

scala> boke03.preferredLocations(boke03.partitions(0)) res27: Seq[String] = List(slave01) 

scala> boke01.preferredLocations(boke01.partitions(0)) res28: Seq[String] = List() 

      可以看到,makeRDD函数有两种实现,第一种实现其实完全和parallelize一致;而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()

  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
} 

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
  assertNotStopped()
  
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
  new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}

      都是ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小

      2) 由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等

scala> val demo01 = sc.textFile("hdfs://master01:9000/RELEASE") 
demo01: org.apache.spark.rdd.RDD[String] = hdfs://master01:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

2.3 RDD编程  

      RDD一般分为数值RDD和键值对RDD,这里先不进行具体区分,先统一来看,下一章会对键值对RDD做专门说明

  2.3.1 Transformation 

      RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行 

      常用的Transformation:

      1) map(func),含义:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

      2) filter(func),含义:返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

      3) flatMap(func),含义:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

      4) mapPartitions(func),含义:类似于map,但独立的在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

      5) mapPartitionsWithIndex(func),含义:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此,在类型为T的RDD上运行时,func的函数类型必须是(Int,Iterator[T]) => Iterator[U]

      6) sample(withReplacement, fraction, seed),含义:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例如从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值) 

      7) takeSample,含义:和Sample的区别是:takeSample返回的是最终的结果集合

      8) union(otherDataset),含义:对源RDD和参数RDD求并集合返回一个新的RDD

      9) intersection(otherDataset),含义:对源RDD和参数RDD求交集后返回一个新的RDD

      10) distinct([numTasks]),含义:对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它

      11) partitionBy,含义:对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则会生成ShuffleRDD

      12) reduceByKey(func, [numTasks]),含义:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置

      13) groupByKey,含义:groupByKey也是对每个key进行操作,但只生成一个sequence

      14) combineByKey[C](

        createCombiner: V => C,

        mergeValue: (C, V) => C,

        mergeCombiners: (C, C) => C)
        含义:对相同K,把V合并成一个集合。createCombiner:combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来创建那个键对应的累加器的初始值。mergeValue:如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。mergeCombiners:由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

      15) aggregateByKey(zeroValue: U, [partitioner: Partitioner])(seqOp: (U, V) => U, combOp: (U, U) => U),

        含义:在kv对的RDD中,按key将value进行分组合并,合并时,将每个 value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value 传递给combine函数进行计算(先将前两个value进行计算,将返回结 果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果

      16) foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)],含义:aggregateByKey的简化操作,seqop和combop相同

      17) sortByKey([ascending], [numTasks]),含义:在一个(K, V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K, V)的RDD

      18) sortBy(func, [ascending], [numTasks]),含义:与sortByKey类似,但是更灵活,可以用func先对数据进行处理,按照处理后的数据比较结果排序

      19) join(otherDataset, [numTasks]),含义:在类型为(K, V)和(K, W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K, (V, W))的RDD

      20) cogroup(otherDataset, [numTasks]),含义:返回类型为(K, V)和(K, W)的RDD上调用,返回一个(K, (Iterable<V>, Iterable<W>))类型的RDD

      21) cartesian(otherDataset),含义:笛卡尔积

      22) pipe(command, [envVars]),含义:对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD。注意:shell脚本需要集群中的所有节点都能访问到

      23) coalesce(numPartitions),含义:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率

      24) repartition(numPartitions),含义:根据分区数,从新通过网络随机洗牌所有数据

      25) repartitionAndSortWithinPartitions(partitioner),含义:repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高

      26) glom,含义:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

      27) mapValues,含义:针对于(K, V)形式的类型只对V进行操作

      28) subtract,含义:计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来

  2.3.2 Action

      常用的Action:

      1) reduce(func),含义:通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

      2) collect(),含义:在驱动程序中,以数组的形式返回数据集的所有元素

      3) count(),含义:返回RDD的元素个数

      4) first(),含义:返回RDD的第一个元素(类似于take(1))

      5) take(n),含义:返回一个由数据集的前n个元素组成的数组

      6) takeSample(withReplacement, num, [seed]),含义:返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

      7) takeOrdered(n),含义:返回前几个的排序

      8) aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U),含义:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致

      9) fold(num)(func),含义:折叠操作,aggregate的简化操作,seqop和combop一样

      10) saveAsTextFile(path),含义:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本

      11) saveAsSequenceFile(path),含义:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统

      12) saveAsObjectFile(path),含义:用于将RDD中的元素序列化成对象,存储到文件中

      13) countByKey(),含义:针对(K, V)类型的RDD,返回一个(K, Int)的map,表示每一个key对应的元素个数

      14) foreach(func),含义:在数据集的每一个元素上,运行函数func进行更新

  2.3.3 数值RDD的统计操作 

      Spark对包含数值数据的RDD提供了一些描述性的统计操作。Spark的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用stats()时通过一次遍历数据计算出来,并以StatsCounter对象返回

      

  2.3.4 向RDD操作传递函数注意 

      Spark的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。在Scala中,我们可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像Scala的其他函数式API一样。还要考虑其他一些细节,比如所传递的函数及其引用的数据需要是可序列化的(实现了Java的Serializable接口)。传递一个对象的方法或者字段时,会包含对整个对象的引用

class SearchFunctions(val query: String) extends java.io.Serializable {

  def ismatch(s: String): Boolean = {
    s.contains(query)
  }

  def getmatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]):
  org.apache.spark.rdd.RDD[String] = {
    // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this"
    rdd.filter(ismatch)
  }

  def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]):
  org.apache.spark.rdd.RDD[String] = {
    // 问题:"query"表示"this.query",因此我们要传递整个"this"
    rdd.filter(x => x.contains(query))
  }

  def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]):
  org.apache.spark.rdd.RDD[String] = {
    // 安全:只把我们需要的字段拿出来放入局部变量中
    val query_ = this.query
    rdd.filter(x => x.contains(query_))
  }

}

      如果在Scala中出现了NotSerializableException,通常问题就在于我们传递了一个不可序列化的类中的函数或字段

  2.3.5 在不同RDD类型间转换  

      有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD上,而join()只能用在键值对RDD上。在Scala和Java中,这些函数都没有定义在标准的RDD类中,所以要访问这些附加功能,必须要确保获得了正确的专用RDD类

      在Scala中,将RDD转为有特定函数的RDD(比如在RDD[Double]上进行数值操作)是由隐式转换来自动处理的

      

2.4 RDD持久化 

  2.4.1 RDD的缓存 

      Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况下不会拖累我们的执行速度,也可以把数据备份到多个节点上

  2.4.2 RDD缓存方式 

      RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用

      

      通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在objectStorageLevel中定义的

      

      缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部的Partition。

      注意:使用Tachyon可以实现堆外缓存

2.5 RDD检查点机制 

      Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能

      cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放在内存中,但是RDD的依赖链(相当于数据库中的redo日志),也不能丢掉,当某个点某个executor宕了,上面cache的RDD就会丢掉,需要通过依赖链重放计算出来,不同的是,checkpoint是把RDD保存在HDFS中,是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链,是通过复制实现的高容错

      如果存在以下场景,则比较适合使用检查点机制:

      1) DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)

      2) 在宽依赖上做Checkpoint获得的收益更大

      为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发

  2.5.1 checkpoint写流程 

      RDD checkpoint过程中会经过以下几个状态:

      [Initialized -> marked for checkpoint -> checkpointing in progress -> checkpointed ]

      转换流程如下:

      

      1) data.checkpoint 这个函数调用中,设置的目录中,所有依赖的RDD都会被删除,函数必须在job运行之前调用执行,强烈建议RDD缓存在内存中(又提到一次,千万要注意),否则保存到文件的时候需要从头计算,初始化RDD的checkpointData变量为ReliableRDDCheckpointData。这时候标记为Initialized状态

      2) 在所有job action的时候,runJob方法中都会调用rdd.doCheckpoint,这个会向前递归调用所有的依赖的RDD,看看需不需要checkpoint。如果需要checkpoint,然后调用checkpointData.get.checkpoint(),里面标记状态为CheckpointingInProgress,里面调用具体实现类的ReliableRDDCheckpointData的doCheckpoint方法

      3) doCheckpoint -> writeRDDToCheckpointDirectory,注意这里会把job再运行一次,如果已经cache了,就可以直接使用缓存中的RDD了,就不需要从头计算一遍了,这时候直接把RDD,输出到hdfs,每个分区一个文件,会先写到一个临时文件,如果全部输出完,进行rename,如果输出失败,就回滚delete

      4) 标记状态为Checkpointed,markCheckpointed方法中清除所有的依赖,怎么清除依赖的呢,就是把RDD变量的强引用设置为null,垃圾回收了,会触发ContextCleaner里面监听清除实际BlockManager缓存中的数据

  2.5.2 checkpoint读流程

      如果一个RDD我们已经checkpoint了那么是什么时候用呢,checkpoint将RDD持久化到HDFS或本地文件夹,如果不被手动remove掉,是一直存在的,也就是说可以被下一个driver program使用。比如spark streaming挂掉了,重启后就可以使用之前checkpoint的数据进行recover,当然在同一个driver program也可以使用。讲下在同一个driver program中是怎么使用checkpoint数据的

      如果一个RDD被checkpoint了,如果这个RDD上有action操作的时候,或者回溯的这个RDD的时候,这个RDD进行计算的时候,里面判断如果已经checkpoint过,对分区和依赖的处理都是使用的RDD内部的checkpointRDD变量

      具体细节如下,

      如果一个RDD被checkpoint了,那么这个RDD中对分区和依赖的处理都是使用的RDD内部的checkpointRDD变量,具体实现是ReliableCheckpointRDD类型。这个是在checkpoint写流程中创建的。依赖和获取分区方法中先判断是否已经checkpoint,如果已经checkpoint,就斩断依赖,使用ReliableCheckpointRDD,来处理依赖和获取分区

      如果没有,才往前回溯依赖。依赖就是没有依赖,因为已经斩断了依赖,获取分区数据就是读取checkpoint到hdfs目录中不同分区保存下来的文件

  2.6 RDD的依赖关系 

      RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)

      

  2.6.1 窄依赖 

      窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

      总结:窄依赖我们形象的比喻为独生子女

  2.6.2 宽依赖 

      宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffe

      总结:宽依赖我们形象的比喻为超生

  2.6.3 Lineage 

      RDD只支持粗粒度转换,即在大量纪录上执行的单个操作,将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区

      

2.7 DAG的生成 

      DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有shuffe的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

      

2.8 RDD相关概念关系

      

      输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block

      当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨文件。随后将为这些输入分片生成具体的Task。inputSplit与Task是一一对应关系

      随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行

        1) 每个节点可以起一个或多个Executor

        2) 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task

        3) 每个Task执行的结果就是生成了目标RDD的一个partition

      注意:这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程  

      而Task被执行的并发度 = Executor数目 * 每个Executor核数

      至于partition的数目:

        1) 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task

        2) 在Map阶段partition数目保持不变

        3) 在Reduce阶段,RDD的聚合会触发shuffe操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的

      RDD在计算的时候,每个分区都会起一个task,所以rdd的分区数目决定了总的task数目

      申请的计算节点(Executor)数目和每个计算节点核数,决定了你同一时刻并行执行的task

      比如的RDD有100个分区,那么计算的时候就会生成100个task,你的资源配置为10个计算节点,每个2个核,同一时刻可以并行的task数目为20,计算这个RDD就需要5次轮次

      如果计算资源不变,你有101个task的话,就需要6个轮次,在最后一轮中,只有一个task在执行,其余核都在空转

      如果资源不变,你的RDD只有两个分区,那么同一时刻只有2个task运行,其余18个核空转,造成资源浪费。这就是在spark调优中,增大RDD分区数目,增大RDD分区数目,增大任务并行度的做法

      

猜你喜欢

转载自www.cnblogs.com/zhanghuicheng/p/11080864.html