1.sparkRDD简介:
Spark的核心是RDD(Resilient Distrubuted Dataset),弹性分布式数据集。由AMPLab实验室提出的,属于分布式内存系统的数据集应用。RDD能与其系统兼容,可以导入外部存储系统的数据集:HDFS、HBase或其他hadoop数据源。
2.RDD的特性:
RDD运算类型 |
说明 |
转换运算 |
RDD执行“转换”à新的RDD,但不会立即执行,必须等到“动作”运算,才会执行(lazy) |
动作运算(action) |
“action”à得到一个新的RDD,会产生数据值、数组或写入文件系统,此时会立即执行,并连同之前的“转换”也一并执行 |
“持久化“ Persistence |
经常使用的RDD“先持久化在内存“,以提高执行效率。 |
3.容错性:
Lineage机制:记录每个RDD与其父RDD之间的关联,记录通过何种操作,由父RDDà本RDD。
Immutable不可变的特性:如果某个节点故障,存储在这个节点的RDD损坏了,于是会重新执行一系列的“转换”à新的RDD,避免因为节点故障导致整个系统无法运行。
4.RDD”转换运算” :
(1)创建intRDD:
scala> val intRDD=sc.parallelize(List(6,2,4,7,9))
intRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
******spark是Lazy的,所以并不会立即执行********
scala> intRDD.collect()
res0: Array[Int] = Array(6, 2, 4, 9, 9)
*****collect()动作,会转换为:数组Array。spark只有遇到动作是才会被执行**********
(2)创建stringRDD:
scala> val stringRDD=sc.parallelize(List("IPHONEX","MI","OPPO","HUAWEI"))
stringRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> stringRDD.collect()
res1: Array[String] = Array(iphone, xiaomi, oppo, huawei)
(3)map运算:
定义函数:
scala> def addOne(x:Int):Int={
| return (x+1)
| }
addOne: (x: Int)Int
scala> intRDD.map(addOne).collect()
res4: Array[Int] = Array(7, 3, 5, 10, 10)
匿名函数:
scala> intRDD.map(x=>x+1).collect()
res6: Array[Int] = Array(7, 3, 5, 10, 10)
匿名函数=匿名参数:
scala> intRDD.map(_+1).collect()
res7: Array[Int] = Array(7, 3, 5, 10, 10)
对字符RDD执行map操作:
scala> stringRDD.map(x=>"name:"+x).collect()
res3: Array[String] = Array(name:iphone, name:xiaomi, name:huawei, name:oppo)
(4)filter运算:
scala> val intRDD=sc.parallelize(List(6,2,4,9,9))
intRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> intRDD.collect()
res8: Array[Int] = Array(6, 2, 4, 9, 9)
scala> intRDD.filter(x=>x<7).collect()
res9: Array[Int] = Array(6, 2, 4)
对字符串进行filter运算:筛选含有o的单词:
scala> stringRDD.filter(x=>x.contains("o")).collect()
res10: Array[String] = Array(iphone, xiaomi, oppo)
(5)删除重复项:
scala> intRDD.distinct().collect()
res17: Array[Int] = Array(4, 6, 9, 2)
(6)randomSplit():
scala> val sRDD=intRDD.randomSplit(Array(0.4,0.6))
sRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[18] at randomSplit at <console>:26, MapPartitionsRDD[19] at randomSplit at <console>:26)
scala> sRDD(0).collect()
res21: Array[Int] = Array(6, 4, 9, 9)
scala> sRDD(1).collect()
res22: Array[Int] = Array(2)
(7)groupBy():
scala> val gRDD=intRDD.groupBy(
| x=>{if(x%2==0)"even" else "odd"}
| ).collect()
gRDD: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(6, 2, 4)), (odd,CompactBuffer(9, 9)))
scala> gRDD(0)
res23: (String, Iterable[Int]) = (even,CompactBuffer(6, 2, 4))
scala> gRDD(1)
res25: (String, Iterable[Int]) = (odd,CompactBuffer(9, 9))
5.多个RDD”转换运算”:
(1).创建三个RDD:
scala> val intRDD1=sc.parallelize(List(6,2,5,9,9))
intRDD1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val intRDD2=sc.parallelize(List(4,9))
intRDD2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val intRDD3=sc.parallelize(List(2,8))
intRDD3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)使用union并集体运算:
scala> intRDD1.union(intRDD2).union(intRDD3).collect()
res0: Array[Int] = Array(6, 2, 5, 9, 9, 4, 9, 2, 8)
scala> (intRDD1++intRDD2++intRDD3).collect()
res3: Array[Int] = Array(6, 2, 5, 9, 9, 4, 9, 2, 8)
(3)使用intersection交集运算:
scala> intRDD1.intersection(intRDD2).collect()
res5: Array[Int] = Array(9)
(4)subtract差集运算:
scala> intRDD1.subtract(intRDD2).collect()
res6: Array[Int] = Array(2, 5, 6)
(5)cartesian笛卡尔乘积运算:
intRDD1.cartesian(intRDD2).collect()
res7: Array[(Int, Int)] = Array((6,4), (6,9), (2,4), (2,9), (5,4), (5,9), (9,4), (9,9), (9,4), (9,9))
6.sparkRDD基本动作运算:
(1)读取第一个元素:
scala> intRDD1.first
res9: Int = 6
(2)读取前几条记录:
scala> intRDD1.take(2)
res10: Array[Int] = Array(6, 2)
(3)按照从小到大读取前几条记录:
scala> intRDD1.takeOrdered(4)
res12: Array[Int] = Array(2, 5, 6, 9)
(4)统计:
scala> intRDD1.stats
res13: org.apache.spark.util.StatCounter = (count: 5, mean: 6.200000, stdev: 2.638181, max: 9.000000, min: 2.000000)
(5)最大,最小:
scala> intRDD1.min
res14: Int = 2
scala> intRDD1.max
res15: Int = 9
(6)标准差:
scala> intRDD.map(x=>x+1).collect()
res6: Array[Int] = Array(7, 3, 5, 10, 10)
(7)计数、求和、平均
scala> intRDD1.count
res17: Long = 5
scala> intRDD1.sum
res18: Double = 31.0
scala> intRDD1.mean
res19: Double = 6.2
7.RDD Key-Value基本“转换”运算:
(1)创建key-value RDD:
scala> val kvRDD1=sc.parallelize(List((3,4),(3,6),(8,6),(1,3)) )
kvRDD1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:24
(2)列出keys:
scala> kvRDD1.keys.collect()
res20: Array[Int] = Array(3, 3, 8, 1)
(3)列出所有values:
scala> kvRDD1.values.collect()
res21: Array[Int] = Array(4, 6, 6, 3)
(4)fileter筛选出key or value:
scala> kvRDD1.filter{case(key,value)=>key<5}.collect()
res22: Array[(Int, Int)] = Array((3,4), (3,6), (1,3))
scala>
scala> kvRDD1.filter{case(key,value)=>value<6}.collect()
res23: Array[(Int, Int)] = Array((3,4), (1,3))
(5)mapValues运算:
scala> kvRDD1.mapValues(x=>x*x).collect
res24: Array[(Int, Int)] = Array((3,16), (3,36), (8,36), (1,9))
scala> kvRDD1.mapValues(x=>x+10).collect
res25: Array[(Int, Int)] = Array((3,14), (3,16), (8,16), (1,13))
(6)sortByKey:从小到大按key排序:
scala> kvRDD1.sortByKey(true).collect
res26: Array[(Int, Int)] = Array((1,3), (3,4), (3,6), (8,6))
scala> kvRDD1.sortByKey().collect
res27: Array[(Int, Int)] = Array((1,3), (3,4), (3,6), (8,6))
scala> kvRDD1.sortByKey(false).collect
res28: Array[(Int, Int)] = Array((8,6), (3,4), (3,6), (1,3))
(7)reduceByKey:
1:kvRDD1数据(3,4),(3,6),(8,6),(1,3),第一个字段key,第二个value
2:reduceByKey会寻找相同的key,(3,4),(3,6)
3:reduceByKey运算后,合并数据à(3,4+6)
scala> kvRDD1.reduceByKey((x,y)=>x+y).collect
res29: Array[(Int, Int)] = Array((1,3), (3,10), (8,6))
scala> kvRDD1.reduceByKey(_+_).collect
res30: Array[(Int, Int)] = Array((1,3), (3,10), (8,6))
8.多个RDD Key-Value基本“转换”运算join:
(1)创建key-value RDD:
scala> val kvRDD1=sc.parallelize(List((3,4),(3,6),(8,6),(1,3)) )
kvRDD1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:24
scala> val kvRDD2=sc.parallelize(List(3,8))
kvRDD2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:24
(2)join运算:
scala> val kvRDD1=sc.parallelize(List((3,4),(3,6),(8,6),(1,2)))
kvRDD1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val kvRDD2=sc.parallelize(List((3,8)))
kvRDD2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> kvRDD1.join(kvRDD2).foreach(println)
(3,(4,8))
(3,(6,8))
(3)leftOuterJoin(左连接):
scala> kvRDD1.leftOuterJoin(kvRDD2).foreach(println)
(1,(2,None))
(3,(4,Some(8)))
(3,(6,Some(8)))
(8,(6,None))
(4)rightOuterJoin(右连接):
scala> kvRDD1.rightOuterJoin(kvRDD2).foreach(println)
(3,(Some(4),8))
(3,(Some(6),8))
(5)subtractByKey:
scala> kvRDD1.subtractByKey(kvRDD2).collect
res3: Array[(Int, Int)] = Array((1,2), (8,6))
(6)sortByKey:从小到大按key排序:
scala> kvRDD1.sortByKey(true).collect
res26: Array[(Int, Int)] = Array((1,3), (3,4), (3,6), (8,6))
scala> kvRDD1.sortByKey().collect
res27: Array[(Int, Int)] = Array((1,3), (3,4), (3,6), (8,6))
scala> kvRDD1.sortByKey(false).collect
res28: Array[(Int, Int)] = Array((8,6), (3,4), (3,6), (1,3))
9.key-value动作运算:
(1)读取记录:
scala> kvRDD1.first
res5: (Int, Int) = (3,4)
scala> kvRDD1.take(2)
res6: Array[(Int, Int)] = Array((3,4), (3,6))
(2)取值:
scala> val kvFirst=kvRDD1.first
kvFirst: (Int, Int) = (3,4)
scala> kvFirst._1
res3: Int = 3
scala> kvFirst._2
res4: Int = 4
(3)countByKey---统计每个key对应的value个数:
scala> kvRDD1.countByKey
res5: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2, 8 -> 1)
(4)collectAsMap:
scala> var kv=kvRDD1.collectAsMap
kv: scala.collection.Map[Int,Int] = Map(8 -> 6, 1 -> 2, 3 -> 6)
*************如果有两个值对应一个key,系统只对应一个*********
(5)根据key值取出value值:
scala> kv(3)
res6: Int = 6
scala> kv(1)
res7: Int = 2
(6)lookup:
scala> kvRDD1.lookup(3)
res8: Seq[Int] = WrappedArray(4, 6)
scala> kvRDD1.lookup(8)
res9: Seq[Int] = WrappedArray(6)