spark的编程接口包括
1.分区信息,数据集的最小分片
(1)Patitions()用法:
scala> val part=sc.textFile("/user/README.md",6)
part: org.apache.spark.rdd.RDD[String] = /user/README.md MapPartitionsRDD[9] at textFile at <console>:24
scala> part.partitions.size
res3: Int = 6 #可以用来打印分区个数
2.依赖关系,指向其父RDD
(1)Dependencies()使用方法:
scala> val part=sc.textFile("/user/README.md")
scala> val wordmap=part.flatMap(_.split(" ")).map(x=>(x,1))
scala> wordmap.dependencies.foreach{dep=> println(dep.getClass)} #调取getClass方法可以获取依赖方式
class org.apache.spark.OneToOneDependency
scala> wordredue.dependencies.foreach{dep=> println(dep.getClass)}
class org.apache.spark.ShuffleDependency
3.函数,基于夫RDD的计算方法(Iterator)
(1):mappartition(传入一个函数)
分区函默认的有两个分区器对每个分区进行计算,例如:
val a=sc.parallelize(1to9,3).mapPatitions(f)就是对1-3,4-6,7-9分别进行f操作
4.划分策略和数据位置的元数据
1.(hasPartitioner)
2.(RangePartitioner)
使用Partitioner(只对kv形式的RDD有意义非kv格式都是None)对rdd进行操作查看分区方式
rdd的创建操作分两种
1,并行化集合的创建
(1)SparkContext的parallelize方法,它是在已知的集合创建的,会被复制,然后利用这个复制的创建一个可以被并行处理的分布式数据集这个方法可以有一个或者两个参数必须有个Seq参数,然后后面可以跟一个分区数量,如果你不输入一般就是你配置的是的一个
(2)makeRDD()这个方法可以知道首选分区,意思就是可以指定rdd放在那个节点上
2.外部存储创建操作,可以将hadoop支持的文件转化成RDD(但是你的所有节点都要能访问到文件)
(1)textFILE(文件路径,分片数量)这个分数量不能小于hdfs的快数量!!!
(2)sequenceFILE(),sequenceFILE是hadoop存储二进制形式的KV格式的一种文本文件(使用较少)
(3)hadoopFILE()
(4)hadoopRDD(),可以将其他的形式的hadoop数据转化成RDD
最后给大家提一点就是我们应该如何设置rdd的并行度,并行度最好是你资源的两倍