文章目录
map和flatmap
scala> val nums = sc.parallelize(List(1,2,3,4,5,6))
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val squares = nums.map(x => x * x)
squares: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:25
scala> squares.collect
res5: Array[Int] = Array(1, 4, 9, 16, 25, 36)
scala> nums.flatMap(x =>1 to x).collect
res6: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6)
scala> nums.map(x =>1 to x).collect
res7: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1), Range(1, 2), Range(1, 2, 3), Range(1, 2, 3, 4), Range(1, 2, 3, 4, 5), Range(1, 2, 3, 4, 5, 6))
scala> nums.map(x =>1 to x).flatMap(x => x).collect
res9: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6)
join
scala> a.collect
res10: Array[(String, String)] = Array((a,a1), (b,b1), (c,c1), (d,d1), (f,f1), (f,f2))
scala> b.collect
res11: Array[(String, String)] = Array((a,a2), (c,c2), (c,c3), (e,e1))
//inner join
scala> a.join(b).collect
res14: Array[(String, (String, String))] = Array((a,(a1,a2)), (c,(c1,c2)), (c,(c1,c3)))
//left join
scala> a.leftOuterJoin(b).collect
res15: Array[(String, (String, Option[String]))] = Array((d,(d1,None)), (b,(b1,None)), (f,(f1,None)), (f,(f2,None)), (a,(a1,Some(a2))), (c,(c1,Some(c2))), (c,(c1,Some(c3))))
//right join
scala> a.rightOuterJoin(b).collect
res16: Array[(String, (Option[String], String))] = Array((e,(None,e1)), (a,(Some(a1),a2)), (c,(Some(c1),c2)), (c,(Some(c1),c3)))
//full join
scala> a.fullOuterJoin(b).collect
res17: Array[(String, (Option[String], Option[String]))] = Array((d,(Some(d1),None)), (b,(Some(b1),None)), (f,(Some(f1),None)), (f,(Some(f2),None)), (e,(None,Some(e1))), (a,(Some(a1),Some(a2))), (c,(Some(c1),Some(c2))), (c,(Some(c1),Some(c3))))
使用Spark Core进行词频统计
源文件:
[hadoop@hadoop001 data]$ cat wordcount.txt
world world hello
China hello
people person
love
scala> val wc = sc.textFile("file:///home/hadoop/data/wordcount.txt")
wc: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/wordcount.txt MapPartitionsRDD[28] at textFile at <console>:24
scala> wc.collect
res18: Array[String] = Array(world world hello, China hello, people person, love)
//这个里面每个元素类型是String类型
//就是说world world hello这个是一个元素,China hello是另一个元素...
scala> val splits = wc.flatMap(x => x.split(" "))
splits: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at flatMap at <console>:25
//把RDD里面多个元素压扁,比如上面有三个,最后变成了(world world hello China hello people person love),然后再来做map,按照空格分割
scala> splits.collect
res31: Array[String] = Array(world, world, hello, China, hello, people, person, love)
//RDD里面每个元素为String类型
scala> val wordone = splits.map(x => (x,1))
wordone: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[47] at map at <console>:25
//对RDD里面每个元素赋值1的操作,x变成了(x,1)
//将splits中存放的单词映射为一个tuple元组,元组中有两个元素,第一个元素为单词,第二个元素为当前单词本次的个数,固定为1
scala> wordone.collect
res32: Array[(String, Int)] = Array((world,1), (world,1), (hello,1), (China,1), (hello,1), (people,1), (person,1), (love,1))
//RDD里面元素类型为(String, Int)类型 (key,value)
scala> val result = wordone.reduceByKey(_+_)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[49] at reduceByKey at <console>:25
//存在一个shuffle操作的,相同的key分发到同一个reduce上面去,根据key做聚合,然后根据相同的key,把后面的value加起来
scala> result.collect
res34: Array[(String, Int)] = Array((love,1), (hello,2), (world,2), (people,1), (China,1), (person,1))
//数据结构为(String, Int)
//保存的方式有这么多
scala> result.saveAs
saveAsHadoopDataset saveAsNewAPIHadoopDataset saveAsObjectFile saveAsTextFile
saveAsHadoopFile saveAsNewAPIHadoopFile saveAsSequenceFile
scala> result.saveAsTextFile("/data/wcresult.txt")
//讲结果保存到本地
map是对RDD里面的每个元素做map,flatMap是先把RDD进行扁平化处理,然后做map。
在上面的基础上,按照每个单词出现的次数做降序/升序排列
scala> val result = wc.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey(_+_)
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[52] at reduceByKey at <console>:25
scala> result.collect
res35: Array[(String, Int)] = Array((love,1), (hello,2), (world,2), (people,1), (China,1), (person,1))
//按照单词出现的次数做升序排列:(使用sortByKey)
//利用sortByKey这个方法来实现,但是需要把key和value先调一下,比如(world,2),变成(2,world),然后再排序,最后再调回来即可。
//sortByKey,意思是按照Key的大小进行排序,默认参数是升序
scala> result.map(x => (x._2,x._1)).sortByKey().map(x => (x._2,x._1)).collect
res2: Array[(String, Int)] = Array((love,1), (people,1), (China,1), (person,1), (hello,2), (world,2))
//map(x => (x._2,x._1))表示把里面里面的每个元组tuple,里的第一个元素和第二个元素换位置
//这样就实现了排序
上面可以合成一个:
scala> wc.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey().map(x => (x._2,x._1)).collect
res3: Array[(String, Int)] = Array((love,1), (people,1), (China,1), (person,1), (hello,2), (world,2))
//按照单词出现的次数做降序排列:
//sortByKey,意思是按照Key的大小进行排序,默认参数是升序,如果是降序,传入个参数就可以了sortByKey(false)
//其它和升序一样
scala> wc.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect
res4: Array[(String, Int)] = Array((hello,2), (world,2), (love,1), (people,1), (China,1), (person,1))
//还可以使用top
//这是一个柯里化的函数,top命令是查看前多少条数据,如图可见,在查看之时,元素也是排序好的
scala> wc.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey(_+_).map(x => (x._2,x._1)).top(6).map(x => (x._2,x._1))
res9: Array[(String, Int)] = Array((world,2), (hello,2), (person,1), (people,1), (love,1), (China,1))
//按照单词出现的次数做升序排列:(使用sortBy)
//上面排序是使用sortByKey进行排序,现在使用另外一个方法sortBy
sortBy: sortBy(x$1, Boolean) 第一个参数是元组里你要写的第几个参数,第二个参数是Boolean类型,true是升序,默认的,可以不写,false是降序,要写的
//按照value升序
//直接按照RDD里面的每个元组里的第二个元素的大小进行排序,默认升序
scala> result.sortBy(_._2).collect
res43: Array[(String, Int)] = Array((love,1), (people,1), (China,1), (person,1), (hello,2), (world,2))
scala> result.sortBy(x => x._2).collect //和上面一样
res46: Array[(String, Int)] = Array((love,1), (people,1), (China,1), (person,1), (hello,2), (world,2))
//可以合起来写:
scala> wc.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res15: Array[(String, Int)] = Array((hello,2), (world,2), (love,1), (people,1), (China,1), (person,1))
//按照单词出现的次数做降序排列:(使用sortBy)
scala> result.sortBy(_._2,false).collect
res14: Array[(String, Int)] = Array((hello,2), (world,2), (love,1), (people,1), (China,1), (person,1))
还有这个:
scala> result.sortBy(_._2).foreach(println)
(hello,2)
(world,2)
(love,1)
(people,1)
(China,1)
(person,1)
subtract 减法求子集
a.subtract(b):
把在RAA a里面,不在RDD b里面的元素返回出来
scala> val a = sc.parallelize(1.to(5))
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[62] at parallelize at <console>:24
scala> val b = sc.parallelize(2.to(3))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at parallelize at <console>:24
scala> a.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5)
scala> b.collect
res19: Array[Int] = Array(2, 3)
scala> a.subtract(b).collect
res20: Array[Int] = Array(4, 1, 5)
intersection 求交集
scala> a.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5)
scala> b.collect
res19: Array[Int] = Array(2, 3)
scala> a.intersection(b).collect
res27: Array[Int] = Array(2, 3)
cartesian:笛卡尔积
scala> a.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5)
scala> b.collect
res19: Array[Int] = Array(2, 3)
//形成了键值对的结构
scala> a.cartesian(b)
res29: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[108] at cartesian at <console>:28
scala> a.cartesian(b).collect
res28: Array[(Int, Int)] = Array((1,2), (2,2), (1,3), (2,3), (3,2), (4,2), (5,2), (3,3), (4,3), (5,3))
takeOrdered
takeOrdered(n, [ordering])
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
在使用implicit Ordering[T]排序之后,返回前num个元素
以其自然序或使用自定义的比较器返回 RDD 的前 n 元素
scala> val a = sc.parallelize(List(2,1,3,5,4,8,6,7))
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[112] at parallelize at <console>:24
scala> a.collect
res34: Array[Int] = Array(2, 1, 3, 5, 4, 8, 6, 7)
scala> a.takeOrdered(5)
res35: Array[Int] = Array(1, 2, 3, 4, 5)
takeSample
takeSample(withReplacement, num, [seed])
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]
返回一个数组,其中包含数据集的num个元素的随机样本,又放回或无放回的进行采样,可选地预先指定随机数生成器种子。
返回RDD中的随机元素 false表示每次只取没有被取过的元素,true表示从整个集合中取
scala> val a = sc.parallelize(1 to 10)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[109] at parallelize at <console>:24
scala> a.collect
res51: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> a.takeSample(true,5)
res54: Array[Int] = Array(9, 3, 3, 1, 8)
scala> a.takeSample(true,5)
res55: Array[Int] = Array(5, 6, 5, 5, 9)
//上面是true,可能会有重复的,下面是false,没有重复的
scala> a.takeSample(false,5)
res56: Array[Int] = Array(1, 10, 8, 2, 6)
scala> a.takeSample(false,5)
res57: Array[Int] = Array(2, 9, 6, 4, 5)
sample
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
使用给定的随机数生成器种子,在又放回或无放回情况下对数据进行采样
采样方式:
- 不放回: 每个元素被抽中的概率; fraction 必须在 [0, 1] 之间
- 有放回: 每个元素被抽中的次数; fraction 必须 >= 0
countByKey
将RDD中的数据按Key计数
scala> val data = sc.parallelize(List((1,3),(1,2),(5,4),(1, 4),(2,3),(2,4)),3)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[135] at parallelize at <console>:24
scala> data.collect
res58: Array[(Int, Int)] = Array((1,3), (1,2), (5,4), (1,4), (2,3), (2,4))
scala> data.countByKey()
res59: scala.collection.Map[Int,Long] = Map(1 -> 3, 5 -> 1, 2 -> 2)
mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
类似map,但是在每个RDD分区上单独运行,因此func的类型为Iterator[T]=>Iterator[U]
foreach
def foreach(f: T => Unit): Unit
使用f函数遍历RDD中的所有元素
可参考:https://blog.csdn.net/goldlone/article/details/83868822#t3