一.Action操作
1.first:返回rdd中的以一个元素
scala> var rdd = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
scala> rdd.first
//输出结果为:(A,1)
scala> var rdd = sc.makeRDD(Seq(10, 3, 1, 13, 6))
scala> rdd.first
//输出结果为:10
2.count:返回rdd中的元素数量
scala> var rdd = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
scala>rdd.count
//输出结果为:3
3.reduce():根据需求对rdd里的元素进行运算,返回结果
scala>var rdd=sc.makeRDD(Array(("A",2),("A",5),("B",2),("C",3))
scala>varrdd.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})
//输出结果为(CBAA,12)
scala>var rdd1=sc.makeRDD(1 to 10,2)
scala>rdd1.reduce(_+_)
//输出结果为1到10相加的和
4.collect:将一个rdd转换成数组
scala> var rdd1 = sc.makeRDD(1 to 3,2)
scala> rdd1.collect
//结果为Array(1,2,3)
5.take(n):获得1到n之间的元素(不排序)
scala> var rdd = sc.makeRDD(Seq(10, 3, 1, 13, 6))
scala>rdd.take(3)
//输出结果为Array(10,3,1)
6.top (n):默认降序返回n个元素
takeOrdered(n):按照与top相反返回n个元素
scala> var rdd = sc.makeRDD(Seq(10, 3, 1, 13, 6))
scala>rdd.top(3)
//返回结果为Array(13,10,6)
scala>rdd.takeOrdered(3)
//此时输出结果为Array(1,3,6)
7.lookup:lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
scala>var rdd=sc.makeRDD(Array(("A",2),("A",5),("B",2),("C",3))
rdd.lookup("A")
//输出为:(2,5)
8.countByKey:统计RDD[K,V]中每个K的数量
scala>var rdd=sc.makeRDD(Array(("A",2),("A",5),("B",2),("C",3))
scala>rdd.countByKey
//输出结果为scala.collection.Map[String,Long] = Map(A -> 2, B -> 1,C ->1)
9.sortby:根据给定的排序k函数将RDD中的元素进行排序。
scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
scala> rdd1.sortBy(x => x).collect
//输出结果为 Array(0, 1, 2, 3, 6, 7) //默认升序
scala> rdd1.sortBy(x => x,false).collect
//输出结果为:Array(7, 6, 3, 2, 1, 0) //降序
//RDD[K,V]类型
scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
scala> rdd1.sortBy(x => x).collect
//输出结果为Array((A,1), (A,2), (B,3), (B,6), (B,7))
scala> rdd1.sortBy(x => x._2,false).collect
//输出结果为:Array((B,7), (B,6), (B,3), (A,2), (A,1))
10.saveAsTextFile:以Text类型保存到指定路径
scala>var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://user/tmp/zhangjiaxin/")]//以text形式将文件保存到hdfs
scala>rdd1.saveAsTextFile("file:///user/tmp/zhangjiaxin/")//将文件保存到本地
scala> rdd1.saveAsTextFile("hdfs://user/tmp/zhangjiaxin/"),classOf[com.hadoop.compression.lzo.LzopCodec]////指定压缩格式保存
scala>rdd1.saveAsSequenceFile("hdfs://user/tmp/zhangjiaxin/")]//以Sequence形式将文件保存到hdfs
scala>rdd1.saveAssaveAsObjectFile("hdfs://user/tmp/zhangjiaxin/")]//以saveAsObject形式将文件保存到hdfs
11.saveAsHadoopDaraset将文件保存到HDFS中
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/zhang/")
rdd1.saveAsHadoopDataset(jobConf)
12.数据保存到HBase
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
var conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
jobConf.set("zookeeper.znode.parent","/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"hyxy")
jobConf.setOutputFormat(classOf[TableOutputFormat])
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
rdd1.map(x => {
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsHadoopDataset(jobConf)
二.转换操作
1.map 有多少输入分区就有多少输出分区
hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
//读取文件到RDD
var data=sc.textFile("/user/tmp/a.txt")
var result=data.map(line=>line.split(","))//使用map算子调用.split方法按照,进行切分
result.collect
//运行结果为Array[Array[String]] = Array(Array(hello, world), Array(hello, scala), Array(hello, spark))
2.flatMap 与map大致相同 只不过把输出结果合并为一个分区
hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
//读取文件到RDDvar data=sc.textFile("/user/tmp/a.txt")var result=data.flatMap(line=>line.split(","))//使用flatMap算子调用.split方法按照,进行切分
result.collect
//运行结果为Array[String] = Array(hello, world, hello, scala, hello, spark)
result.distinct.collect //.distinct(去重)
3.coalesce(n)定义n个重分区 如果重分区数量大于之前的分区数量那么必须加true
hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
var data=sc.textFile("/user/tmp/a.txt").replaceAll(","," ")//使用.replaceAll方法把,替换成空格
data.collect
//结果为Array[String] = Array(hello world, hello scala, hello spark)
data.partitions.size
//结果为Int=2 //默认俩分区
var rdd=data.coalesce(1) //将分区改为1
rdd.partitions.size //结果为Int=1
var rdd=data.coalesce(3)
rdd.partitions.size//结果为2 如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,否则,分区数不便
var rdd=data.coalesce(3,true)
rdd.partitions.size//结果为Int=4
4.repartition:该函数其实就是coalesce函数第二个参数为true的实现
hadoop fs -cat /user/tmp/a.txt
hello,world
hello,scala
hello,spark
var data=sc.textFile("/user/tmp/a.txt").replaceAll(","," ")//使用.replaceAll方法把,替换成空格
data.collect
//结果为Array[String] = Array(hello world, hello scala, hello spark)
data.partitions.size
//结果为Int=2 //默认俩分区
var rdd=data.repartition(1) //将分区改为1
rdd.partitions.size //结果为Int=1
var rdd=data.repartition(3)
rdd.partitions.size//结果为3
5.glom 将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组
var rdd= sc.makeRDD(1 to 10,3)
rdd.partitions.size //结果为3个分区
rdd.glom().collect//结果为Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组
6.union 将两个RDD进行合并,不去重
var rdd1 = sc.makeRDD(1 to 2,1)
var rdd2 = sc.makeRDD(2 to 3,1)
rdd1.union(rdd2).collect
//结果为Array[Int] = Array(1, 2, 2, 3)
7.intersection 该函数返回两个RDD的交集,并且去重
var rdd1 = sc.makeRDD(1 to 2,1)
var rdd2 = sc.makeRDD(2 to 3,1)
rdd1.intersection(rdd2).collect//结果为3
8.subtract 该函数类似于intersection,但返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
var rdd1 = sc.makeRDD(Seq(1,2,2,3))
var rdd2 = sc.makeRDD(3 to 4)
rdd1.subtract(rdd2).collect
//结果为Array[Int] = Array(1, 2, 2)
9.mapPartitions 该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多
var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
//rdd3将rdd1中每个分区中的数值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2
10.zip 函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量必须都相同,否则会抛出异常。
var rdd1 = sc.makeRDD(1 to 5,2)
var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd1.zip(rdd2).collect//结果为Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))
rdd2.zip(rdd1).collect//结果为Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
11.zipPartitions zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求
var rdd1 = sc.makeRDD(1 to 5,2)
var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
scala> rdd1.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect //结果为Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)
//rdd2两个分区中元素分布
rdd2.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| while(iter.hasNext){
| result ::= ("part_" + x + "|" + iter.next())
| }
| result.iterator
|
| }
| }.collect /结果为Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)
//rdd1和rdd2做zipPartition
rdd1.zipPartitions(rdd2){
| (rdd1Iter,rdd2Iter) => {
| var result = List[String]()
| while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
| result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
| }
| result.iterator
| }
| }.collect //结果为Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
12 zipWithIndex 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2.zipWithIndex().collect //结果为Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
13.zipWithUniqueId 与zipWithIndex相似只不过他是按照分区的索引
var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1.zipWithUniqueId().collect //结果为Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//总分区数为2
//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
三.键值对转换操作
1.mapValues 同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。
var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1.mapValues(x => x + "_").collect //结果为Array[(Int, String)] = Array((1,A_), (2,B_), (3,C_), (4,D_))
2.flatMapValues 同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作
var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1.flatMapValues(x => x + "_").collect //结果为Array((1,A), (1,_), (2,B), (2,_), (3,C), (3,_), (4,D), (4,_))
3.combineByKey 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。
var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.combineByKey(
| (v : Int) => v + "_",
| (c : String, v : Int) => c + "@" + v,
| (c1 : String, c2 : String) => c1 + "$" + c2
| ).collect
//结果为Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
//其中三个映射函数分别为:
//createCombiner: (V) => C (v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String)
//mergeValue: (C, V) => C (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)
//mergeCombiners: (C, C) => C (c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String) 其他参数为默认值。
//最终,将RDD[String,Int]转换为RDD[String,String]。
rdd1.combineByKey(
(v : Int) => List(v),
(c : List[Int], v : Int) => v :: c,
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
//结果为 Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
//最终将RDD[String,Int]转换为RDD[String,List[Int]]。
4.foldByKey 该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1.foldByKey(0)(_+_).collect
//结果:Array[(String, Int)] = Array((A,2), (B,3), (C,1))
//将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)
rdd1.foldByKey(2)(_+_).collect
//结果Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)
5.groupByKey该函数用于将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1.groupByKey().collec
//结果为:Array[(String, Iterable[Int])] = Array((A,CompactBuffer(0, 2)), (B,CompactBuffer(2, 1)), (C,CompactBuffer(1)))
6.reduceByKey该函数用于将RDD[K,V]中每个K对应的V值根据映射函数来运算。
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
var rdd2 = rdd1.reduceByKey((x,y) => x + y)
rdd2.collect
//结果为Array[(String, Int)] = Array((A,2), (B,3), (C,1))
7.reduceByKeyLocally该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1.reduceByKeyLocally((x,y) => x + y)//res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)
8.cogroup 相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
var rdd3 = rdd1.cogroup(rdd2)//如果还要关联多个rdd则直接作为另一个参数传进去即可
rdd3.collect
//结果:Array[(String, (Iterable[String], Iterable[String]))] = Array(
(B,(CompactBuffer(2),CompactBuffer())),
(D,(CompactBuffer(),CompactBuffer(d))),
(A,(CompactBuffer(1),CompactBuffer(a))),
(C,(CompactBuffer(3),CompactBuffer(c)))
9.join 相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
rdd1.join(rdd2).collect //结果为Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
10.leftOuterJoin 类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2
rdd1.leftOuterJoin(rdd2).collect
//结果为Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))
11.rightOuterJoin 与leftOuterJoin相反
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2
rdd1.rightOuterJoin(rdd2).collect
//结果为Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))
---------------------
作者:不会水的鱼i
来源:CSDN
原文:https://blog.csdn.net/z1219346000/article/details/80465409
版权声明:本文为博主原创文章,转载请附上博文链接!