一、一些对象
1.Driver Program:包含程序的main()方法,RDDs的定义和操作,它管理很多节点,我们称之为executors
2.SparkContext:Driver Program通过SparkContext对象访问Spark,SparkContext对象代表和一个集群的连接
3.在shell中SparkContext对象自动创建好了,就是sc,可以在shell中直接使用sc
二、RDDs(Resilient distributed dataset——弹性分布式数据集)
1.RDDs的介绍:
1)并行的分布在集群中
2)RDDs是Spark分发数据和计算的基础抽象类
3)一个RDD是不可改变的分布式集合对象
4)Spark中,所有的计算都是通过RDDs的创建、转换等操作完成的
5)一个RDD内部由许多==partitions(分片)==组成
分片:
每个分片包括一部分数据,partitions可在集群不同节点上计算
分片是Spark并行处理的单元,Spark会顺序的、并行的处理分片
2.RDDs的创建方法:
1)把一个已存在的集合传给SparkContext的parallelize()方法,可用来测试
val rdd=sc.parallelize(Array(1,2,2,4),4) ----第一个参数:待并行化处理的集合
2)加载外部数据集:可以加载本地文件,也可以加载hadoop文件
三、RDD基本操作之Transformation
1.Transformation:从之前的RDD构建一个新的RDD,像map()和filter()……
2.逐元素Transformation:
1)map()——接收函数,把函数应用到RDD的每一个元素,返回新的RDD
2)filter()——接受一个函数,返回只包含,满足filter()函数的新RDD
3)flatMap()——对每个输入元素,输出多个输出元素,flatMap()将RDD中的元素压扁后返回一个新的RDD
3.集合运算:RDDs支持数学集合的运算,例如:并集、交集等
1)distinct()——去重方法
val c1=sc.parallelize(Array("coffe","coffe","tea","tea","coka"))
c1.foreach(println)
tea
coffe
tea
coka
coffe
c1.distinct.foreach(println)
coka
tea
coffe
2)union()——并集
val c2=sc.parallelize(Array("coffe","tea"))
c1.distinct.union(c2).foreach(println)
coffe
tea
coffe
coka
tea
3)intersection()——交集
c1.distinct.intersection(c2).foreach(println)
coffe
tea
4)subtract()
c1.distinct.subtract(c2).foreach(println)
coka
四、RDD基本操作之Action
1.Action:在RDD上计算出来一个结果,把结果返回给driver program或保存在文件系统,像count(),save……
2.collect():遍历整个RDD,向driver program返回RDD的内容,可以用此查看小数据;大数据的时候,使用savaAsTextFile()这一action
3.reduce():接收一个函数,作用在RDD两个类型相同的元素上,返回新元素;可以实现:RDD中元素的累加,计数,和其他类型的聚集操作
4.take(n):返回RDD的n个元素,返回结果是无序的,一般是测试使用
5.top():排序
6.foreach():计算RDD中的每个元素,但不返回到本地,可以配合println友好的打印出数据
五、RDDs的特性
1.RDDs的血统关系图:
Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图
Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据
2.延迟计算:
Spark对RDDs的计算是,他们第一次使用action操作的时候
这种方式在处理大数据的时候特别有用,可以减少数据的传输
Spark内部记录metadat,表明transformation操作已经被响应了
加载数据也是延迟计算,数据只有在必要的时候,才会被加载进去
3.RDD.persist():
默认每次在RDDs上面进行action操作时,Spark都重新计算RDDs
如果想重复利用一个RDD,可以使用RDD.persist()
unpersist()方法是从缓存中移除
六、KeyValue对
1.创建KeyValue对RDDs:使用map()函数,返回KeyValue对
2.reduceByKey():把相同的key结合
val rdd1=sc.parallelize(Array((1,2),(3,4),(3,6)))
val rdd2=rdd1.reduceByKey(_+_)
rdd2.foreach(println)
(1,2)
(3,10)
3.groupByKey():把相同key的values分组
val rdd3=rdd1.groupByKey()
rdd3.foreach(println)
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
4.mapValues():函数作用于pairRDD的每个元素,key不变
val rdd4=rdd1.mapValues(x=>x+1)
rdd4.foreach(println)
(1,3)
(3,5)
(3,7)
5.flatMapValues():符号化的时候使用
val rdd5=rdd1.flatMapValues(x=>x to 5)
rdd5.foreach(println)
(3,4)
(3,5)
(1,2)
(1,3)
(1,4)
(1,5)
6.keys:仅返回keys
val rdd6=rdd1.keys
rdd6.foreach(println)
1
3
3
7.values:仅返回values
8.sortByKey():按照key排序的RDD
9.combineByKey():!!!
参数(createCombiner,mergeValue,mergeCombiners,partitioner)
最常用的基于key的聚合函数,返回的类型可以与输入类型不一样,许多基于key的聚合函数都用到了它,像groupByKey()……
元素的key,要么是之前见过的,要么是新的
遍历partition中的如果是新元素,使用我们提供的createCombiner()函数
如果是这个partition中已经存在的key,就会使用mergeValue()函数
合计每个partition的结果的时候,使用mergeCombiners()函数元素
//求平均成绩
//初始化
val scores=sc.parrallelize(Array(("Nina",80),("Nina",90),("Nina",100),("Jack",100),("Jack",100),("Jack",100)))
scores.foreach(println)
(Jack,100)
(Jack,100)
(Jack,100)
(Nina,80)
(Nina,90)
(Nina,100)
//使用combineByKey()函数,求每个人的总科目数和总成绩
val result=scores.combineByKey(score=>(1,score),(c1:(Int,Int),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))
result.foreach(println)
(Jack,(3,300))
(Nina,(3,270))
//求平均成绩
val average=result.map{case(name,(num,score))=>(name,score/num)}
average.foreach(println)
(Nina,90)
(Jack,100)