spark core
spark core中最重要的部分:RDD(弹性分布式数据集,是spark计算的基石,为用户隐蔽了底层对数据的复杂抽象和处理,为用户提供了一组方便的数据转换与求值方法)
val prdd = sc.parallelize(1 to 10)
prdd.collect
val lrdd = sc.parallelize(List("a","b","c"))
lrdd.collect
val mrdd = sc.makeRDD(0 to 10)
mrdd.collect
val aa = sc.makeRDD(List((1,List("a","b","c")),(2,List("d","e","f"))))
aa.
aa.partitions.size
aa.preferredLocations(aa.partitions(1))
aa.preferredLocations(aa.partitions(0))
1、RDD是整个Spark的计算基石。是分布式数据的抽象,为用户屏蔽了底层复杂的计算和映射环境
1、RDD是不可变的,如果需要在一个RDD上进行转换操作,则会生成一个新的RDD
2、RDD是分区的,RDD里面的具体数据是分布在多台机器上的Executor里面的。堆内内存和堆外内存 + 磁盘。
3、RDD是弹性的。
1、存储:Spark会根据用户的配置或者当前Spark的应用运行情况去自动将RDD的数据缓存到内存或者磁盘。他是一个对用户不可见的封装的功能。
2、容错:当你的RDD数据被删除或者丢失的时候,可以通过血统或者检查点机制恢复数据。这个用户透明的。
3、计算:计算是分层的,有应用->JOb->Stage->TaskSet-Task 每一层都有对应的计算的保障与重复机制。保障你的计算不会由于一些突发因素而终止。
4、分片:你可以根据业务需求或者一些算子来重新调整RDD中的数据分布。
2、Spark Core干了什么东西,其实就是在操作RDD
RDD的创建--》RDD的转换--》RDD的缓存--》RDD的行动--》RDD的输出。
3、RDD怎么创建?
创建RDD有三种方式:
1、可以从一个Scala集合里面创建
1、sc.parallelize(seq) 把seq这个数据并行化分片到节点
2、sc.makeRDD(seq) 把seq这个数据并行化分片到节点,他的实现就是parallelize
3、sc.makeRDD(seq[(T,seq)] 这种方式可以指定RDD的存放位置
2、从外部存储来创建,比如sc.textFile("path")
3、从另外一个RDD转换过来。
RDD转换
val sourceRdd = sc.makeRdd(1 to 10)
sourceRdd.
sourceRdd.map(_ * 2)
res0.collect
val filter = sc.makeRDD(Array("aa","bb","cc","dd"))
filter.
filter.filter(_.)
filter.filter(_.startsWith("aa")).collect
val ff = sc.textFile("abc.text")
ff.collect(出错,因为RDD是懒执行的)
//********************** 转换操作 *********************
1、def map[U: ClassTag](f: T => U): RDD[U] 将函数应用于RDD的每一元素,并返回一个新的RDD
2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD
3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。
4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。
7、def union(other: RDD[T]): RDD[T] 将两个RDD中的元素进行合并,返回一个新的RDD
8、def intersection(other: RDD[T]): RDD[T] 将两个RDD做交集,返回一个新的RDD
9、def distinct(): RDD[T] 将当前RDD进行去重后,返回一个新的RDD
10、def partitionBy(partitioner: Partitioner): RDD[(K, V)] 根据设置的分区器重新将RDD进行分区,返回新的RDD。
11、def reduceByKey(func: (V, V) => V): RDD[(K, V)] 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
12、def groupByKey(): RDD[(K, Iterable[V])] 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD
13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。
14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。