目录
学习spark,必须要学RDD,那么RDD是什么呢?
我刚开始以为RDD是一门语言,类似C,Java,Python。
接触后才知道,RDD是一个数据集。
Spark中,我们用过对分布式数据集的操作来表达我们的计算意图。这些计算会自动在集群上并行运行。
这样的数据集被称为 弹性分布式数据集(resilient distributed dataset)
简称RDD
刚开始学习RDD,可以简单理解为一个数组。方便我们的理解
RDD基础
RDD的重要性
RDD是Spark对数据的核心抽象。
RDD其实就是分布式的数据集合。
在Spark 中,对数据的所有操作不外乎
创建RDD,转化已有RDD,调用RDD操作进行求值
RDD操作
## 创建RDD 创建RDD有两种方式。
1)在驱动器程序里分发驱动器程序中的对象集合(比如list/set)
创建RDD 最简单的方法就是把程序中一个已有的集合传给SparkContext的 parallelize()方法。
scala中的parallelize() 方法:
val lines = sc.parallelize(List("pandas","i like pandas"))
java中的parallelize() 方法:
JAVARDD<String> lines = sc.parallelize(Arrays.asList("Panda","i like pandas"));
2)读取一个外部数据集
这种方式在实际应用中更加常用,
这里介绍用来将文本文件读入为一个存储字符串的RDD方法:SparkContext.textFile()
Scala中的textFile方法:
val lines = sc.textFile("/path/to/README.md")
Java 中的textFile()方法:
JavaRDD<String> lines = sc.textFile("/path/to/README.md")
初学时可以将RDD简单将其看为一个数组。
操作RDD
RDD创建成功后,支持两种类型的操作:
转化操作
行动操作
转化操作会由一个RDD生成一个新的RDD。
行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中。
转化操作和行动操作的区别在于Spark 计算RDD的方式不同。
Spark会惰性计算创建的RDD。就是等到需要用到RDD的时候才会真正计算。
默认情况下,Spark的RDD 会在每次对他们进行行动操作的时候重新计算。
但是当多个行动操作中重用一个RDD的时候,每调用一次就会重新计算一次RDD。这样就会很麻烦,而且很影响速度。有什么办法可以解决呢?
可以使用RDD.persist()方法让spark把这个RDD缓存下来,这样就可以多次重用这个RDD
转换操作
转换操作是返回新的RDD的操作,比如map,filter
不清楚map,filter函数可以参考一下
https://blog.csdn.net/weixin_45468845/article/details/106254568
转化操作可以操作任意数量的输入RDD
转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD 才会被计算
小案例:
假定有一个日志文件,内含若干消息,希望选出其中的错误消息。
可以使用前面的转化操作filter().
Python:
iputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x:"error" in x)
Scala:
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
Java:
JavaRDD<String> inpuRDD = sc.textFile("log.txt");
JavaRDD<String> errorsRDD = inputRDD.filter(
new Function<String,Bolean>(){
public Boolean call(String x){
return x.contains("errors");
}
);
行动操作
行动操作是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count()(计算),first()(返回RDD第一个元素)。
由于行动操作需要实际的输出,它们会强制执行那些求值必须用到的RDD的转化操作
前面的例子中,对log.txt中的错误信息计数并输入
Scala:
println("Input had " + badLinesRDD.count() + " concerning lines")
printfln("Hrer are 10 examples:")
badLinesRDD.take(10).foreach(println)
Java:
System.out.println("Input had " + badLinesRDD.count() + " concerning lines");
System.out.println("Here are 10 examples:");
for(String line: badLinesRDD.take(10)){
System.out.println(line);
}
每当我们调用一个新的行动操作时,整个RDD 都会从头开始计算。当要多次调用该行动操作时,可以将中间结果持久化
区分是哪种操作
Spark对待转化操作和行动操作的方式很不一样。理解正在进行的操作是很重要的。
分辨一个特定的函数是转换操作还是行动操作可以根据其返回值判断:
转化操作返回RDD,行动操作返回的是其他数据类型
惰性求值
1)RDD的转化操作都是惰性求值的。
这意味着在被调用行动操作之前Spark 之前不会开始计算。
比如对RDD进行map()操作时,操作并不会立即执行,而是会在内部记录下所要求执行的操作的相关信息。
我们不应该把RDD看作存放着特定数据的数据集,而最好把每个RDD当作我们通过转化操作构建出来的,记录如何计算数据的指令列表。
2)把数据读取到RDD也是惰性的。
例如当我们调用sc.textFile()时,数据并没有读取进RDD,而是再必要时才会读取。和转化操作一样,读取数据的操作也可能会多次执行
3)惰性求值可以将一些操作合并到一起来减少计算数据的步骤。
常见的转化操作和行动操作
针对各个元素的转化操作
最常用的两个转化操作应该是map(),filter(),flatMap()
可以参考一下
https://blog.csdn.net/weixin_45468845/article/details/106254568
伪集合操作
尽管RDD并不是严格意义上的集合,但也支持许多数学上的集合操作,比如合并和相交操作。但这些操作要求RDD是相同数据类型的
distinct()去重
RDD中并没有元素的唯一性,常常包含重复的元素。
如果只需要唯一的元素,我们可以使用RDD.distinct() 转化操作来生成一个只包含不同元素(没有重复元素)的新RDD。
但是distinct() 操作开销很大,因为它需要将所有数据通过网络进行shuffle。
union(other)合并
最简单的集合操作是union(other),它会返回一个包含两个RDD中所有元素的RDD。
如果输入的RDD中有重复元素,Spark的union() 操作也会包含这些重复元素
intersection(other)选择相同元素
该方法只返回两个RDD中都有的元素。
在运行时也会去掉所有重复的元素(单个RDD中的重复元素也会一起移除)
需要通过shuffle混洗才能发现共有的元素
subtract(other) 排除另一个RDD中的元素
subtract()函数接受另一个RDD 作为参数,
返回一个由只存在于第一个RDD而不存在于第二个RDD中的所有元素组成的RDD。
和intersection()一样,它也需要数据混洗
cartesian(other)求笛卡尔积
cartesian(other)转化操作会返回所有可能的(a,b)对,a是源RDD中的元素,b来自另一个RDD
行动操作
reduce()
基本RDD中最常见的的行动操作是reduce()。
它接收一个函数作为参数,这个函数要接收两个相同类型的RDD数据并返回一个同样类型的新元素
Scala:
val sum = rdd.reduce((x, y) => x + y)
Java:
Integer sum = rdd.reduce(
new Function2<Integer,Integer, Integer>(){
public Integer call(Integer x, Integer y){
return x + y;
}
});
fold()
之前的那篇文章里有介绍到fold()函数,这里就不在介绍
需要注意的是,flod()和reduce() 都要求函数的返回值类型和我们所操作的RDD中的元素类型相同。
但是当我们需要返回一个不同类型的值时,比如计算平均值时需要返回一个记录遍历过程中的计数以及元素数量的二元组。应该如何做呢?
一种方法是:
先对数据使用map操作,将元素转为该元素和1的二元组,然后再用reduce函数进行操作
aggregate()
aggregate()函数把我们从返回值必须与所操作的RDD类型相同的限制中解放出来。
与fold() 类似,使用aggregate() 时,需要提供我们期待返回的类型的初始值
然后通过一个函数把RDD 中的元素合并起来放入累加器,再提供一个函数将累加器两两合并。(这句话稍微理解一下就行了)
用aggreagte()来计算RDD 的平均值来代替上面的方法:
Scala:
val result = input.aggregate((0,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc, value) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble()
collect()
collect 是将数据返回驱动器程序中最简单,最常见的操作,他会将整个RDD的内容发挥
take()
take(n) 返回RDD中的n个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合
top()
如果为数据定义了顺序,就可以使用top() 从RDD 中获取前几个元素,top会使用数据的默认顺序,也可以提供自己的比较函数来提取前几个元素。