一、RDD
1.1 什么是RDD?
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark计算过程的核心,是spark计算过程中的瞬时结果,下一个RDD依赖于上一个RDD。它代表一个不可变、可分区、里面的元素可并行计算的集合。
数据集就是由许多数据组成的集合了
RDD本身并不是分布式的,里面的数据是分布式的。
那么,弹性是什么意思呢?弹性有哪些表现呢?
先看一下resilient这个单词的英文解释:adj. recovering readily from adversity, depression, or the like 容易地从逆境或类似的情况中恢复过来。《Learning Spark:Lightning-fast Data Analysis》一书中解释“弹性”是指在任何时候都能进行重算。这样当集群中的一台机器挂掉而导致存储在其上的RDD丢失后,Spark还可以重新计算出这部分的分区的数据。但用户感觉不到这部分的内容丢失过。这样RDD数据集就像块带有弹性的海绵一样,不管怎样挤压(分区遭到破坏)都是完整的。
RDD的弹性体现在:
1.自动进行内存和磁盘切换
2.基于lineage的高效容错
3.task如果失败会特定次数的重试
4.stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
5.checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
6.数据调度弹性:DAG TASK 和资源管理无关
7.数据分片的高度弹性repartion
1.2 五大特性
可以看参考一下spark源码中的注释:
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
* for more details on RDD internals.
*/
1.一组分区的集合
2.一个函数作用于每个分区
3.依赖于其他RDDs
4.可以重新分区
5.数据本地性:每个分区的数据优先在本地计算,但并不代表不能在别的机器上计算。
二、算子
算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
2.1 算子分类
● 转换(transformation)算子 :这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
转换算子又可以分为创建算子和缓存算子
● 行动(action)算子 :这类算子会触发 SparkContext 提交 Job 作业。Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
2.2 常用算子
转换算子:
cartesian:两个RDD进行笛卡尔积合并
coalesce:对RDD重新进行分区,第一个参数为分区数;第二个参数为是否进行shuffle操作,默认值为false,当shuffle=false时,不能增加分区数,但不会报错,分区个数还是原来的
cogroup:对多个RDD中的Key-Value元素按key值进行合并,每个RDD中相同key中的元素分别聚合成一个集合。
distinct:将原始RDD中重复出现的元素进行过滤,返回一个新生成的RDD。即原RDD中每个元素在新生成的RDD中只出现一次
filter:对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将被过滤掉
map:和集合中的map作用相同
flatMap:和集合中的flatMap作用相同
flatMapValues:同基本转换操作中的flatMap,只不过是针对[K,V]中的V值进行flatMap操作
groupByKey:将Key-Value型RDD中的元素按照Key值进行汇聚,Key值相同的Value值会合并为一个序列。和cogroup区别在于cogroup可以合并多个RDD,而groupByKey则针对一个RDD
intersection:返回两个RDD的交集
keys:对Key-Value型RDD获取所有的key
mapValues:同基本转换操作中的map,针对[K,V]中的V值进行map操作
reduceByKey:将Key-Value型RDD按照key值进行聚合操作,生成新的RDD。
sample:对RDD进行抽样,其中参数withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;fraction表示抽样比例;seed为随机数种子,比如当前时间戳
sortByKey:对Key-Value型RDD按照key值进行升序或降序排列
subtract:差集,即返回在oneRDD中出现,并且不在otherRDD中出现的元素,不去重
subtractByKey:对Key-Value型RDD按照key值做差集,即返回key在oneRDD中出现,并且不在otherRDD中出现的元素,不去重。
union:合并两个RDD,不去重,要求两个RDD中的元素类型一致
values:对Key-Value型RDD获取所有的value
zip:用于将两个RDD组合成Key-Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
行动算子:
collectAsMap:如果RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value
count:返回RDD中的元素数量
countByKey:用于统计Key-Value型RDD中每个Key的数量
countByValue:用于统计Key-Value型RDD中每个元素的个数
first:返回RDD中的第一个元素,不排序
lookUp:用于Key-Value类型的RDD,指定K值,返回RDD中该Key对应的所有Value值
reduce:根据映射函数f对RDD中的元素进行二元计算,返回计算结果
saveAsTextFile:将数据集的元素,以textfile的形式保存到本地文件系统hdfs或者任何其他hadoop支持的文件系统,spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本
saveAsObjectFile:使用java的序列化方法保存到本地文件,可以被sparkContext.objectFile()加载
take:获取RDD中从0到num-1下标的元素,不排序(即抽取前num个元素)
takeSample:对RDD进行抽样,其中参数withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;fraction表示抽样个数(注意这个和sample的区别);seed为随机数种子。
top:用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素
takeOrdered:takeOrdered和top类似,只不过以和top相反的顺序返回元素
三、基于血统的容错机制
RDD 默认采用任务失败重新计算的容错机制,这种容错机制本身效率并不高,但由于 RDD 为不可变数据集,并且每个 RDD 都会记录各自的运算过程图,所以当某个子运算产生 RDD 的过程中发生异常,会依次向上一级RDD找数据,直到找到数据,并根据此结果重新计算(这需要父级RDD持久化),最糟糕的情况就是找到HDFS整体重新计算。
如果长时间没有结果,可以开辟一个位置从HDFS上读取数据重新计算,哪个先计算出结果就保留哪个。