目录
1、RDD简介
Spark对数据的一种核心抽象,Resilient Distributed Dataset,弹性分布式数据集,不可变,是val类型
RDD数据存储在内存中,采购服务器时,需选择内存较大的机器,计算能力强
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
存储型:硬盘大
扫描二维码关注公众号,回复:
4332571 查看本文章
计算型:内存大
弹性:服务闲置较多时,减少服务;服务访问压力过大时,增加服务
分布式:集群上多个节点,不一定某部分数据分给谁;
数据集:数据的容器,比如之前的map、list、set。
2、RDD创建
- 数据源在程序内部,练习用,实际开发一般不会是这种方式,默认分区数2
val data = "Though my daily life is extremely monotonous" sc.parallelize(data.split(" "))
- 外部数据源
sc.textFile("spark/src/test.txt")
- testFile:
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * * * Hadoop-supported file system URI, and return it as an RDD of Strings. * * @path: * 比如local file:"src/test.txt" * Hadoop-supported:local,"file://test.txt" * hdfs, "hdfs://namenode:test.txt" */ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
- path:本地文件路径,或者所有hadoop支持的文件系统uri(写法见代码中path举例)
- minPatitions:指定RDD的分区数(RDD一个分区,就是一个Task),默认是
defaultMinPartitions: Int = math.min(defaultParallelism, 2) // defaultParallelism是配置文件中指定的或程序中设定的参数
- 支持通配符*:比如textFile("/my/*.txt")、textFile("my/*")
- 支持读取压缩文件:textFile("/my/*.gz")
- 支持指定多个文件:textFile("/my/text1.txt","/my/text2.txt")
- testFile:
3、常用RDD算子
(1)Action RDD
- foreach:遍历每个元素,无返回值,一般用在将结果存储到数据库中使用
- saveAsTextFile存储到hdfs,RDD每个partition存到hdfs的一个block块
- saveAsObjectFile:存储到hdfs,将每个partition的数据序列化后,以sequenceFile(序列化)格式存到hdfs
- collect:将RDD转换为本地数据集合
- collectAsMap:PairRDD类型,可以转换为本地的map类型
- count:RDD集合中数据量
- countByValue:各个元素在RDD中出现的次数,比如结果为{(2,1), (3,2)} ,2出现1次,3出现2次
- countByKey():Returns a hashmap of (K, Int) pairs with the count of each key
- lookup(k: Key):(PairRDD类型)选出与k匹配元素,将结果以sequence形式返回本地集
- top(num:Int) / take(num:Int):取出最大(最小)的num个元素
- 可以结合count做分页
- 可以用来批量传输数据
- first:取第一个元素
- reduce(f: (T, T) => T): T
- 二元运算函数聚合rdd的元素,最后合成一个值
- 聚合,each partition先做聚合,再mergeResult将分区聚合结果再聚合
- fold(zeroValue: T)(op: (T, T) => T):设置了初始值的聚合,也是先对each partition内用op聚合,再将每个partition的计算结果用op再聚合
// 21,分区内聚合得11,再对所有分区结果再聚合得21 println(sc.parallelize(List(1), 1).fold(10)((x:Int,y:Int)=>x+y)))
// 33,两个分区结果分别是11,12,之后再聚合10+11,再+12,33 println(sc.parallelize(List(1,2), 2).fold(10)((x:Int,y:Int)=>x+y))
- 类似reduce,区别是多了初始值,各个分区算一次zeroValue,最后聚合分区结果时再算一次
- aggregate:先归并后聚合
- 类似fold:区别是聚合前后数据类型可以不一致,而reduce/fold聚合前后类型一致
- 计算:注意初始值会多次加入到结果中,分区数(M)+最后combine result次数=初始值参与M+1次
println(sc.parallelize(List(1, 2,3,4,5), 2).aggregate("s")( (str: String, num: Int) => { println(str + "\t\t\t" + num) s"${str}_${num}" }, (str1: String, str2: String) => { println(str1 + "\t" + str2) s"${str1}/${str2}" } )) 结果 s 1 s_1 2 s 3 s_3 4 s_3_4 5 s s_3_4_5 s/s_3_4_5 s_1_2 s/s_3_4_5/s_1_2 // s 初始值加入了3次
- seqOp:an operator used to accumulate results within a partition,初始值是zeroValue
- combOp:an associative operator used to combine results from different partitions,初始值是zeroValue
(2)单个RDD的 Transformation (惰性)
- filter:过滤结果集
- distinct:去重,返回新的RDD
- map:遍历元素执行操作函数,返回值类型由操作函数决定,一个数据建立一个连接
- mapPartition:
- 和map类似,都是遍历,区别是遍历的是分区,函数是作用于每个分区而不是每个元素
- 适用于需要频繁建立外部链接的情况
- 比如spark streaming消费kafka消息之后需要操作数据库时,需要建立数据库链接,此时用mapPartition而非map,一个分区建立一个链接,以减少连接数
- mapPartitionWithIndex:和mapPartition一样,只不过多提供了一个参数partition index,内部源码都是通过new MapPartitionsRDD
val source1 = sc.parallelize(List("apple", "apple", "pig", "apple", "pig"), 2) source1.mapPartitionsWithIndex{ (index, iter) => { var res = List[(String,Int)]() while(iter.hasNext) res = res.::(iter.next(), index) res.iterator } }.foreach(println) 结果: (apple,0) (apple,0) (pig,1) (apple,1) (pig,1)
- flatMap:压平,将map后每个元素的子元素取出来(比如map处理后每个元素是个list,list里每个元素提出来)
val sourceFlatMap = sc.parallelize(List(("apple,banana"), ("pig,cat,dog"))) println(sourceFlatMap.flatMap(_.split(",")).collect.mkString(" ")) 结果: apple banana pig cat dog
- 拉平元素
- 类似map,区别是可以把二维RDD转成一维,如果map后不是集合,最终结果和map没区别
- flatMapValues[U](f: V => TraversableOnce[U])
val source = sc.parallelize(List(("fruit", "apple,banana"), ("animal", "pig,cat,dog"))) println(source.flatMapValues(_.split(",")).collect.mkString(" * ")) 结果 (fruit,apple) * (fruit,banana) * (animal,pig) * (animal,cat) * (animal,dog)
- 拉平value
-
keyBy[K](f: T => K):将rdd转成 key-value元组数据结构的pair rdd,根据value(T)得出k
val source1 = sc.parallelize(List("apple", "banana", "pig", "cat", "dog")) println(source1.keyBy(_.length).collect.mkString(" ")) 结果: (5,apple) (6,banana) (3,pig) (3,cat) (3,dog)
- groupBy[K](f: T => K, p: Partitioner):
def groupBy[K](f: T => K, p: Partitioner) (implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = withScope { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) }
- 函数f:用来指定key,将rdd中每个元素,转成K
- 根据K,做groupByKey
- combineByKey(createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }
- 下面的groupByKey、reduceByKey、aggregateByKey内部,都用了和combineByKey同样的combineByKeyWithClassTag
- createCombiner:分区内,遍历分区内元素,遇到一个新的key就用这个函数操作一次转成C类型,不是新的key就不操作了(不是新的key操作的是下面的mergeValue)
- mergeValue:分区内的,对上面结果C,又遇到同样的key,做mergeValue操作
- mergeCombiners:分区间函数,合并分区结果
- 比如求平均值
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) val d1 = sc.parallelize(initialScores) type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数) d1.combineByKey( score => (1, score), (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) ).map { case (name, (num, socre)) => (name, socre / num) }.collect
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
- 下面的groupByKey、reduceByKey、aggregateByKey内部,都用了和combineByKey同样的combineByKeyWithClassTag
- groupByKey():RDD[(K, Iterable[V])]:按key分组
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // 1、V类型转成Buffer,不做计算 val createCombiner = (v: V) => CompactBuffer(v) // 2、merge V into Buffer,第一步计算,但其实没算,只是把值放到buffer中 val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v // 3、combine two CompactBuffer[V] into one CompactBuffer[V] val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
- pair RDD的算子
- 较耗时,如果是希望做聚合,using `PairRDDFunctions.aggregateByKey`or `PairRDDFunctions.reduceByKey性能更好:
- 原因是:groupByKey不能在map端做combine,需要把所有数据都insert into compactBuffer中,然后combine,这样会造成 more objects in the old gen(jvm老生带会有大量没用的对象)
- 每次的结果,相同key对应的Iterable,元素顺序可能不同
- reduceByKey:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { // 比较groupByKey,这里func作用了两个地方,一个是merge,一个是combine // merge:相当于是在本地map端先汇总 // combine:之后到reduce端再次汇总 combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
- 类似与groupByKey,区别是先在map端做一次聚合,再到reduce再聚合
- map端做一次汇总,减少数据IO传输,性能比groupBykey好
- aggregateByKey:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
- 类似与reduceByKey,也是分别在map和reduce端做聚合,区别是多了初始值的设置
- zeroValue:初始值,对每种key作用一次
- seqOp:在分区内,Aggregate the values of each key(聚合每个key的values),用zeroValue,结果类型是U
- comOp:a operation for merging two U's between partitions,在分区间运行
- sortByKey:
- 默认对Key做升序排序
- 如果是输出到文件,会写入多个 part-X 文件中
(3)多个RDD的Transformation
- union:两个RDD并行处理
- 没有shuffle,只是将两个rdd合并,可能涉及数据的移动
-
intersection:交集且去重 (比如用在物品归类)
-
cartesian:笛卡儿积
-
subtract(other: RDD[T]):作差(不去重)
-
Return an RDD with the elements from `this` that are not in `other`.
-
-
zip:两个rdd一对一关联处理元素
-
要求两个rdd的元素数量要相同,有相同的值
-
比如表垂直拆分之后合并就用zip
-
-
join:pair RDD的操作,通过key进行关联查询,内部通过cogroup实现
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => // pair的两个iterator都有值,才能到yield for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }
-
返回数据形式为: (k, (v1, v2)) tuple
-
-
leftOuterJoin:pair RDD的操作,左关联查询,内部通过cogroup实现
-
rightOuterJoin:pair RDD的操作,右,内部通过cogroup实现
-
fullOuterJoin:pair RDD的操作,全连接,内部通过cogroup实现,左边没有,以右边为主,右边没有,以左边为主