Spark学习记录(三)核心API模块介绍

spark
-------------
基于hadoop的mr,扩展MR模型高效使用MR模型,内存型集群计算,提高app处理速度。

spark特点
-------------
速度:在内存中存储中间结果。
支持多种语言。Scala、Java、Python
内置了80+的算子.
高级分析:MR,SQL/ Streamming /mllib / graph

RDD:
----------------
是spark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点
进行计算。可以包含任何java,scala,python和自定义类型。

RDD是只读的记录分区集合。RDD具有容错机制。

创建RDD方式,一、并行化一个现有集合。

hadoop 花费90%时间用户rw。、

内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。

内部包含5个主要属性
-----------------------
1.分区列表
2.针对每个split的计算函数。
3.对其他rdd的依赖列表
4.可选,如果是KeyValueRDD的话,可以带分区类。
5.可选,首选块位置列表(hdfs block location);

RDD变换

rdd的变换方法都是lazy执行的
------------------
返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。


map() //对每个元素进行变换,应用变换函数
//(T)=>V

mapPartitions() //对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。

//针对每个数据分区进行操作,入参是分区数据的Iterator,map() 针对分区中的每个元素进行操作。

mapPartitions()  //Iterator<T> => Iterator<U>

注:最好设置每个分区都对应有一个线程。

filter() //过滤器,(T)=>Boolean
flatMap() //压扁,T => TraversableOnce[U]

//同mapPartitions方法一样都是针对分区处理,只不过这个方法可以获取到分区索引

mapPartitionsWithIndex(func)  //(Int, Iterator<T>) => Iterator<U>

//采样返回采样的RDD子集。
//withReplacement 元素是否可以多次采样.
//fraction : 期望采样数量.[0,1]
//表示一个种子,根据这个seed随机抽取,一般都只用到前两个参数
sample(withReplacement, fraction, seed)

作用:在数据倾斜的时候,我们那么多数据如果想知道那个key倾斜了,就需要我们采样获取这些key,出现次数陊的key就是导致数据倾斜的key。如果这些key数据不是很重要的话,可以过滤掉,这样就解决了数据倾斜。

union() //类似于mysql union操作。

intersection //交集,提取两个rdd中都含有的元素。


distinct([numTasks])) //去重,去除重复的元素。

groupByKey() //(K,V) => (K,Iterable<V>)  使用前需要构造出对偶的RDD

reduceByKey(*) //按key聚合。注意他是一个RDD变换方法,不是action

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])//按照key进行聚合,这个函数逻辑较为复杂请看aggregateByKey函数的专题

sortByKey //根据映射的Key进行排序,但是只能根据Key排序

sortBy //比sortByKey更加灵活强大的排序,可根据元组中任意字段排序

join(otherDataset, [numTasks]) //横向连接,有两种数据(K,V)和(K,W),链接后返回(K,(V,W)),两个元组一一对应的

cogroup //协分组,(K,V)和(K,W)分组后返回(K,(V,W)),注意协分组不是一一对应的分组后需要(此处注意与join的区别)



cartesian(otherDataset) //笛卡尔积,RR[(A,B)] RDD[(1,2)] => RDD[(A,1),(A,2),(B,1),(B,2)]

pipe //将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD


coalesce(numPartitions) //减少分区


repartition //再分区


repartitionAndSortWithinPartitions(partitioner)//再分区并在分区内进行排序

RDD Action

Spack的中的方法都是懒的,,只有遇到了action类型的方法才会真正的执行
------------------
collect() //收集rdd元素形成数组.
count() //统计rdd元素的个数
reduce() //聚合,返回一个值。
first //取出第一个元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])

saveAsTextFile(path) //保存到文件
saveAsSequenceFile(path) //保存成序列文件  sc.sequenceFile读取序列文件

saveAsObjectFile(path) (Java and Scala)

countByKey()                       //按照key统计有几个value 

数据倾斜

------------------------------

由于大量相同的Key,在reduce合并计算的过程中,大量相同的Key被分配到了同一个集群节点,导致集群中这个节点计算压力非常大。

本例采用的解决方案是,在map截断将Key先接上一个随机数打散,然后在reduce计算后,再次map还原key,然后进行最终reduce。


Spark WebUI 上面代码运行的DAG 有效无环图,我们可以清楚地看到每一次的reduce聚合都会重新划分阶段

猜你喜欢

转载自my.oschina.net/u/3687664/blog/2876019