Spark笔记(一)

下载Spark

  访问Spark下载页面,选择包类型为"Pre-built for Hadoop 2.7 and later",然后选择"Direct Download",这样就有一个压缩的TAR文件spark-2.4.0-bin-hadoop2.7.tgz。下载好spark之后,我们要进行解压缩,打开终端

cd ~
tar -xf spark-2.4.0-bin-hadoop2.7.tgz
cd spark-2.4.0-bin-hadoop2.7
ls

  Spark带有交互式的shell,可以作即时数据分析,Spark shell可用来与分布式存储在许多机器的内存或者硬盘上的数据进行交互,并且处理过程的分发由Spark自动完成,Spark提供Python以及scala的增强版shell,支持与集群的连接。
第一步是打开Spark shell,要打开Python版本的Spark shell,进入Spark目录输入

bin/pyspark

如果不能使用python3,可以配置如下,在bin/pyspark文件中添加

export PYSPARK_PYTHON=python3

如果要打开Scala版本的shell,则输入

bin/spark-shell

安装scala

确保安装Java 8 JDK,之后下载安装scala

java -version
tar -xf scala-2.12.7.tar.gz

安装jdk

如果没装jdk或者版本较低,可以安装jdk(jdk下载地址),解压安装包

tar -zxvf jdk-8u191-linux-x64.tar.gz

将解压后的文件夹移到/usr/lib目录下

cd  /usr/lib
sudo mkdir jdk
sudo cp ~/下载/jdk-8u191-linux-x64.tar.gz /usr/lib/jdk
cd ./jdk
sudo tar -xf jdk-8u191-linux-x64.tar.gz

配置java环境变量,这里是将环境变量配置在etc/profile,即为所有用户配置JDK环境,使用命令打开/etc/profile文件

sudo vi /etc/profile

在末尾添加以下几行文字

#set java env
export JAVA_HOME=/usr/lib/jdk/jdk1.8.0_191
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

执行命令使修改立即生效

source /etc/profile

配置软连接,软连接相当于windows系统中的快捷键,部分软件可能会从/usr/bin目录下查找Java,因此添加该软连接防止其他软件查找不到的情况

扫描二维码关注公众号,回复: 11937642 查看本文章
sudo update-alternatives --install /usr/bin/java  java  /usr/lib/jdk/jdk1.8.0_191/bin/java 300
sudo update-alternatives --install /usr/bin/javac  javac  /usr/lib/jdk/jdk1.8.0_191/bin/javac 300

在终端输入,出现版本号则表示安装成功

java -version

下图是scala和python的spark shell终端,可以按Ctrl-D退出shell
在这里插入图片描述

RDD示例

  在Spark中,我们通过对分布式数据集的操作来表达我们的计算图,这些计算会自动在集群上并行进行,这样的数据集被称为弹性分布式数据集(resilient distributed dataset),简称RDD,RDD是Spark对分布式数据和计算的基本抽象。
  可以在python上先运行一个RDD实例,统计文件的行数

lines=sc.textFile("README.md")  #创建一个名为lines的RDD
lines.count()  #统计RDD中的元素个数
lines.first()  #这个RDD中的第一个元素,也就是README.md的第一行

  每个Spark应用都由一个驱动器程序来发起集群上的各种并行操作,驱动器程序有关于应用的main函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作,上面的例子里,实际的驱动程序就是Spark shell本身。驱动器程序通过一个SparkContext对象来访问Spark,这个对象代表对计算集群的一个连接,shell启动时已经自动创建了一个SparkContext对象,是一个名为sc的变量,可以打印输出sc来查看类型。

#python
>>> sc
<SparkContext master=local[*] appName=PySparkShell>
//scala
scala> sc
res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@280099a0

  一旦有了SparkContext,就可以用它来创建RDD,上个例子中用sc.textFile()来创建一个代表文件中各行文本的RDD,然后可以在这些行上进行各种操作,如count()。要执行这些操作,驱动器程序一般要管理多个执行器节点。比如,如果我们在集群上运行count()操作,那么不同的节点会统计文件的不同部分的行数。上个例子中暂时是在本地模式下运行Spark shell,因此在单个节点上执行,但是也可以将这个shell连接到集群上来进行并行的数据分析。
  最后,我们有很多用来传递函数的API,可以将对应操作运行在集群上,比如,可以扩展我们的README示例,帅选出文件中有某个特定单词的行,以"Python"为例

>>>lines=sc.textFile("README.md")
>>>pythonLines=lines.filter(lambda line:"Python" in line)
>>>pythonLines.first()
'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

  Spark会自动将函数发到各个执行器节点上,这样,就可以在单一的驱动器程序中编程,并且让代码自动运行在多个节点上。

独立应用

  除了交互式运行之外,Spark也可以应用在Java、Scala或Python的独立程序中,与shell的主要区别在于,需要自行初始化SparkContext,后面关于API的使用是一样的。
  连接Spark的过程在各语言中并不一样,在Java和Scala中,只需要在应用中添加一个对于spark-core工件的Maven依赖,当spark的版本是1.2.0,对应的Maven索引是:

groupId=org.apache.spark
artifactId=spark-core_2.10
version=1.2.0

  Maven是一个流行的包管理工具,可以用于任何基于Java的语言,让你可以轻松连接公共仓库中的程序库,可以使用Maven来构建工程,也可以使用其它能够访问Maven仓库的工具来进行构建,如Scala的sbt工具或者Gradle工具,一些常用的集成开发环境(如Eclipse)也可以让你直接把Maven依赖添加到工程中。
  在Python中,可以把应用写成脚本,但是需要使用Spark自带的bin/spark-submit脚本来运行,spark-submit脚本会帮我们引入Python程序的Spark依赖,这个脚本为Spark的PythonAPI配置好了运行环境,示例如下。

bin/spark-submit my_script.py

  一旦完成了应用与Spark的连接,接下来就需要在程序中导入Spark包并且创建SparkContext,可以通过先创建一个SparkConf对象来配置应用,然后基于这个SparkConf创建一个SparkContext对象。

#python
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local").setAppName("My App")
sc=SparkContext(conf=conf)
//scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf=new SparkConf().setMaster("local").setAppName("My App")
val sc=new SparkContext(conf)
#java
import org.appache.spark.SparkConf;
import org.appache.spark.api.java.JavaSparkContext;
SparkConf conf=new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc=new JavaSparkContext(conf);

  上面例子中创建SparkContext的最基本的方法,只需传递两个参数

  • 集群URL:告诉Spark如何连接到集群上,使用的local表示运行在单机单线程上而无需连接到集群。
  • 应用名:在例子中使用My App,当连接到一个集群时,这个值可以帮助在集群管理器的用户界面中找到应用。
    除此之外,还有很多附加参数可以用来配置应用的运行方式或添加要发送到集群上的代码。关闭Spark可以调用SparkContext的stop()方法,或者直接退出应用。

RDD编程

  RDD指弹性分布式数据集,是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值。而这一切背后,Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。每个RDD都分为多个分区,这些分区运行在集群的不同节点上,RDD可以是Python、Java、Scala中任意类型的对象,甚至是用户自定义的对象。可以使用两种方法创建 RDD:读取一个外部数据集,或在驱动器程序里分发驱动器中的对象集合(如list和set)。

#Python中使用textFile创建一个字符串的RDD
>>>lines=sc.textFile("README.md")

创建出来后,RDD支持两种类型的操作:转化操作和行动操作。转化操作会由一个RDD生成一个新的RDD,例如,根据谓词匹配情况筛选数据就是一个常见的转化操作

#Python 调用转化操作
>>>pythonLines=lines.filter(lambda line:"Python" in line)

另一方面,行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统

#Python 调用first()行动操作
>>>pythonLines.first()
>"Interactive Python Shell"

  转化操作和行动操作的区别在于Spark计算RDD的方式不同,虽然可以在任何时候定义新的RDD,但Spark只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。比如,我们以一个文本文件定义了数据,然后把其中有"Python"的行筛选出来,如果在运行lines=sc.textFile(…)时就把文件中所有的行读取并存储起来,就会消耗很多存储空间,而我们马上就要筛选掉其中很多的数据。相反,一旦Spark了解了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。事实上,在行动操作first()中,Spark只需要扫描文件知道找到第一个匹配的行为止,而不需要读取整个文件。
  最后,默认情况下,Spark的RDD会在每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用一个RDD,可以使用RDD.persist()让Spark把这个RDD缓存下来。在第一次对持久化的RDD计算之后,Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了,当然也可以缓存到磁盘上。

#Python 把RDD持久化到内存中
>>>pythonLines.persist()
>>>pythonLines.count()
>>>pythonLines.first()

总的来说,每个Spark程序或shell回话都按如下方式工作
(1)从外部数据创建出输入RDD
(2)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
(3)告诉Spark对需要重用的中间结果RDD执行persist()操作
(4)使用行动操作(例如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后再执行

创建RDD

  Spark提供了两种创建方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。其中创建最简单的是,把程序中一个已有的集合传给SparkContext的parallelize()方法,不过更常用的是从外部存储中读取数据来创建RDD,如sc.textFile()方法。

#Python
>>>lines=sc.parallelize(["pandas" 'i like pandas'])
//scala
scala>val lines=sc.parallelize(List("pandas","i like pandas"))
#Java
JavaRDD<String> Lines=sc.parallelize(Arrays.asList("pandas","i like pandas"));

RDD转化操作

  RDD的转化操作是返回新RDD的操作,转化出来的RDD是惰性求值的,只有在行动操作中用到这些RDD时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些操作每次只会操作RDD中的一个元素,不过并不是所有的转化操作都是这样的。

#Python 实现filter()转化操作
>>>inputRDD=sc.textFile("log.txt")
>>>errorsRDD=inputRDD.filter(lambda x:"error" in x)

filter()操作不会改变已有的inputRDD中的数据,实际上,该操作会返回一个全新的RDD,inputRDD在后面的程序中还可以继续使用,比如还可以从中搜索别的单词,如从inputRDD中找出有warning的行,接下来使用另一个转化操作union()来打印出有warning的行数。

>>>inputRDD=sc.textFile("log.txt")
>>>errorsRDD=inputRDD.filter(lambda x:"error" in x)
>>>warningsRDD=inputRDD.filter(lambda x:"warning" in x)
>>>badLinesRDD=errorRDD.union(warningsRDD)

union()与filter()的不同点在于它操作两个RDD而不是一个,转化操作可以操作任意数量的输入RDD。通过转化操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph)来记录这些不同RDD之间的依赖关系。Spark需要用这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。

RDD行动操作

  行动操作是第二种类型的RDD操作,它们会把最终求到的结果返回到驱动器程序,或者写入到外部存储系统中,由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的RDD转化操作。上个例子中,可以输出关于badLinesRDD的一些信息,为此,需要使用两个行动操作来实现:用count()来返回计数结果,用take()来收集RDD中的一些元素。

#Python 使用行动操作对错误进行计数
print("Input had"+badLinesRDD.count()+"concerning lines")
print("Here are 10 examples:")
for line in badLinesRDD.take(10):
    print(line)

在这个例子中,在驱动器程序中使用take()获取了RDD中的少量元素,然后在本地遍历这些元素,并在驱动器 端打印出来。RDD还有一个collect()函数,可以用来获取整个RDD中的数据,但是一般只有当整个数据集能在单台机器的内存中放下时,才能使用collect(),不能用在大规模数据集上。此时,通常要把数据写到诸如HDFS或Amazon S3这样的分布式的存储系统中,可以使用saveAsTextFile()、saveAsSequenceFile()或者任意的其他行动操作来把RDD的数据内容以各种自带的格式保存起来。需要注意的是,每当我们调用一个新的行动操作时,整个RDD都会从头开始计算,要避免这种低效的行为,用户可以将中间结果持久化。
  惰性求值意味着当我们对RDD调用转化操作(例如map()方法)时,操作不会立即执行,相反,Spark会在内部记录下所要求执行的操作的相关信息,我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读到RDD的操作也同样是惰性的,因此,当我们调用sc.textFile()时,数据并没有读取进来,而是在必要时才会读取。惰性求值可以把一些操作合并到一起来减少计算数据的步骤,在类似Hadoop MapReduce的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少MapReduce的周期数。而在Spark中,写出一个非常复杂的映射并不一定能比使用很多简单的连续操作获取好很多的性能,因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。

向Spark传递函数

  Spark的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算,在Python中,我们有三种方式来把函数传递给Spark。传递比较短的函数时,可以使用lambda表达式来传递,除此外,也可以传递顶层函数或是定义的局部函数。

#Python 传递函数
word=rdd.filter(lambda s:"error" in s)

def containsError(s):
    return "error" in s
word=rdd.filter(containsError)

传递函数时需要小心的一点是,Python会在不经意间把函数所在的对象也序列化传出去,当传递的对象是某个对象的成员,或者有某个对象中一个字段的引用时(例如self.filed),Spark会把整个对象发送到工作节点上,这可能比想传递的东西大多了,有时如果传递的类里面有Python不知道如何序列化传输的对象,也会导致失败。

#Python 传递一个带字段引用的函数(错误示范)
class SearchFunctions(object):
    def __init__(self,query):
        self.query=query
    def isMatch(self,s):
        return self.query in s
    def getMatchesFunctionReference(self,rdd):
        #问题:在“self.isMatch”中引用了整个self
        return rdd.filter(self.isMatch)
    def getMatchesMemberReference(self,rdd):
        #问题:在“self.query”中引用了整个self
        return rdd.filter(lambda x:self.query in x)

替代方案是,只把所需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量

class WoedFunctions(object):
    ...
    def getMatchesNoReference(self,rdd):
        #安全:只把需要的字段提取到局部变量中
        query=self.query
        return rdd.filter(lambda x:query in x)

  在Scala中,我们可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像Scala的其它函数式API一样。我们还要考虑其它一些细节,比如所传递的函数及其引用是数据需要是可序列化的(实现了Java的Serializable接口)。除此之外,与Python类似,传递一个对象的方法或字段时,会有对真个对象的引用。这在Scala中不是那么明显,不会像Python那样必须用self写出那些引用,可以把需要的字段放到一个局部变量中,来避免传递整个对象。

class SearchFunctions(val query:String){
  def isMatch(s:String):Boolean={
    s.contains(query)
  }
  def getMatchesFunctionReference(rdd:RDD[String]):RDD[String]={
    //问题:“isMatch”表示"this.isMatch",因此我们要传递整个”this“
    rdd.map(isMatch)
  }
  def getMatchesFieldReference(rdd:RDD[String]):RDD[String]={
  //问题:“query”表示“this.query”,因此我们要传递整个“this”
  rdd.map(x=>x.split(query))
  }
  def getMatchesNoReference(rdd:RDD[String]):RDD[String]={
    //安全:只把我们需要的字段拿出来放入局部变量中
    val query_=this.query
    rdd.map(x=>x.split(query_))
  }
}

常见的转化操作和行动操作

  针对各元素的转化操作。最常用的两个转化操作是map()和filter(),转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值,而filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。
  我们可以使用map()来做各种各样的事情:可以把我们的URL集合中的每个URL对应的主机名提取出来,也可以简单到只对各个数字求平方值。map()的返回类型不需要和输入类型一样,这样如果有一个字符串RDD,并且我们的map()函数是用来把字符串解析并返回一个Double值的,那么此时我们的输入RDD类型是RDD[String],而输出类型是RDD[Double]。

#Python 计算RDD中各值的平方
nums=sc.parallelize([1,2,3,4])
squared=nums.map(lambda x:x*x).collect()
for num in squared:
    print("%i" %num)
//Scala
val input=sc.parallelize(List(1,2,3,4))
val result=input,map(x=>x*x)
println(result.collect().mkString(","))
#Java
JavaRDD<Integer> rdd=sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> result-rdd.map(new Function<Integer,Integer>(){
    public Integer call(Integer x){return x*x;}
});
System.out.println(StringUtils.join(result.collect(),","));

有时候,我们希望对每个输入元素生成多个输出元素,实现该功能的操作为flatMap(),和map()类似,提供给flatMap()的函数被分别应用到了输入RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器,输出的RDD不是由迭代器组成的,而是一个有着各个迭代器可访问的所有元素的RDD,一个简单示例是把输入的字符串且分为单词。

#Python
lines=sc.parallelize(["hello world","hi"])
words=lines.flatMap(lambda line:line.split(" "))
words.first()
//Scala
val lines=sc.parallelize(List("hello world","hi"))
val words=lines.flatMap(line=>line.split(" "))

可以把flatMap()看作将返回的迭代器“拍扁”,这样就有了一个由各列表中的元素组成的RDD,而不是一个由列表组成的RDD。
  伪集合操作。尽管RDD本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并等操作,这些操作要求RDD是相同数据类型的。RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素,如果只要唯一的元素,可以使用RDD.distinct()转化操作来生成一个只有不同元素的新RDD。不过需要注意,distinct()操作的开销很大,因为需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。
  最简单的集合操作是union(other),返回一个有两个RDD中所有元素的RDD,与数学中的union()操作不同的是,如果输入的RDD中有重复数据,Spark的union()操作也会有这些重复数据。还有intersection(other)方法,只返回两个RDD中有的元素,去掉所有重复元素,因为要通过网络混洗发现共有元素,所以性能要差很多。有时需要移除一些数据,subtract(other)函数接收另一个RDD作为参数,返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD,它也需要数据混洗。也可以计算两个RDD的笛卡尔积,cartesian(other)转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b来自另一个RDD。笛卡尔积在我们希望考虑所有可能的组合的相似度时比较有用,比如计算各用户对各种产品的预期兴趣程度。我们也可以求一个RDD与其自身的笛卡尔积,这可以用于求用户相似度的应用中,不过,求大规模RDD的笛卡尔积开销巨大。
对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作

函数名 目的 示例 结果
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD rdd.map(x=>x+1) {2,3,4,4}
flatMap() 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来切分单词 rdd.flatMap(x=>x.to(3)) {1,2,3,2,3,3,3}
filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x=>x!=1) {2,3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 对RDD采样,以及是否替换 rdd.sample(false,0.5) 非确定的

对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作

函数名 目的 示例 结果
union() 生成一个有两个RDD中所有元素的RDD rdd.union(other) {1,2,3,4,5}
instersection() 求两个RDD共有的元素 rdd.intersection(other) {3}
subtract() 移除一个RDD中的内容 rdd.subtract(other) {1,2}
cartesian() 与另一个RDD的笛卡尔积 rdd.cartesian(other) {(1,3),(1,4),…,(3,5)}

  行动操作。reduce()是一个常见的行动操作,它接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。一个简单的例子是函数+,可以用它来对RDD进行累加。使用reduce()可以很方便计算出RDD中所有元素的总和、元素的个数,以及其他类型的聚合操作。

#Python
sum=rdd.reduce(lambda x,y:x+y)
//Scala
val sum=rdd.reduce((x,y)=>x+y)
#Java
INteger sum=rdd.reduce(new Function2<Integer,Integer,Integer>(){
  public Integer call(Integer x,Integer y){return x+y;}
});

  fold()和reduce()类似,接收一个与reduce()接收的函数名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果,这个初始值应当是操作的单位元素,也就是说函数对这个初始值多次计算不会改变结果。fold()和reduce()都要求函数的返回值类型需要和我们所操作的RDD中的元素类型相同,但有时需要返回一个不同类型的值,例如计算平均值时,需要记录遍历过程中的计数以及元素的数量,这就需要返回二元组。可以先对数据使用map()操作,来把元素转为该元素和1的二元组,也就是我们所希望的返回类型,这样reduce()就可以以二元组的形式进行归约了。
  aggregate()函数则把我们从返回值类型必须与所操作的RDD类型相同的限制中解放出来,与fold()类似,使用aggregate()时需要提供我们期待返回的类型的初始值,然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

#Python
sumCount=nums.aggregate((0,0),
              (lambda acc,value:(acc[0]+value,acc[1]+1)),
              (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))
return sumCount[0]/float(sumCount[1])
//Scala
val result=input.aggregate((0,0))(
             (acc,value)=>(acc._1+value,acc._2+1),
             (acc1,acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2))
val avg=result._1/result._2.toDouble
#Java
class AvgCount implements Serializable{
  public AvgCount(int total,int num){
    this.total=total;
    this.num=num;
  }
  public int total;
  public int num;
  public double avg(){
    return total/(double)num;
  }
}
Function2<AvgCount,Integer,AvgCount>addAndCount=
  new Function2<AvgCount,Integer,AvgCount>(){
    public AvgCount call(AvgCount a,Integer x){
      a.total+=x;
      a.num+=1;
      return a;
    }
  };
Function2<AvgCount,AvgCount,AvgCount> combine=
  new Function2<AvgCount,AvgCount,AvgCount>(){
  public AvgCount call(AvgCount a,AvgCount b){
    a.total+=b.total;
    a.num+=b.num;
    return a;
  }
};
AvgCount initial=new AvgCount(0,0);
AvgCount result=rdd.aggregate(initial,addAndCount,combine);
System.out.println(result.avg());

  RDD的一些行动操作会以普通集合或值的形式将RDD的部分或全部数据返回驱动器程序,其中最简单的操作是collect(),它会将整个RDD的内容返回。collect()通常在单元测试中使用,因为此时RDD的整个内容不会很大,可以放在内存中。take(n)返回RDD中的n个元素,并且尝试只访问尽量少的分区,因此该操作会有一个不均衡的集合,这些操作返回原宿的顺序与预期的可能不一样。如果为数据定义了顺序,就可以使用top()从RDD中获取前几个元素,top()会使用数据的默认顺序,但我们也可以提供自己的比较函数,来提取前几个元素。有时需要在驱动器程序中对我们的数据进行采样,takeSample(withReplacement,num,seed)函数可以让我们从数据中获取一个采样,并指定是否替换。
  有时会对RDD中的所有元素应用一个行动操作,但是不把任何结果返回到驱动器程序中,这也是有用的,比如可以用JSON格式把数据发送到一个网络服务器上,或者把数据存到数据库中,这时可以用foreach()行动操作来对RDD中的每个元素进行操作,而不需要把RDD发回本地。
对一个数据为{1,2,3,3}的RDD进行基本的RDD行动操作

函数名 目的 示例 结果
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,3}
count() RDD中的元素个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 从RDD返回num个元素 rdd.take(2) {1,2}
top(num) 从RDD返回最前面的num个元素 rdd.top(2) {3,3}
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSample(withReplacement,num,[seed]) 从RDD中返回任意一些元素 rdd.takeSample(false,1) 非确定的
reduce(func) 并行整合RDD中所有数据(例如sum) rdd.reduce((x,y)=>x+y) 9
fold(zero)(func) 和reduce()一样,但是需要提供初始值 rdd.fold(0)((x,y)=>x+y) 9
aggregate(zeroValue,seqOp,combOp) 和reduce()相似,但是通常返回不同类型的函数 ((x,y)=>(x._1+y,x._2+1))((x,y)=>(x._1+y._1,x._2+y._2)) (9,4)
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func)

  有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD上,而join()只能用在键值对RDD上,在Scala和Java中,这些函数都没有定义在标准的RDD类中,所以要访问这些附加功能,必须要确保获取了正确的专用RDD类。Python的API结构与Java和Scala有所不同,在Python中,所有的函数都实现在基本的RDD类中, 但如果操作对应的RDD数据类型不正确,就会导致运行时错误。

持久化(缓存)

  如前所述,Spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据

//Scala
val result=input.map(x=>x*x)
println(result.count())
println(result.collect().mkString(","))

为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们求出的分区数据。如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。出于不同的目的,可以为RDD选择不同的持久化级别。在Scala和Java中,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中。在Python,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储的JVM堆空间中。当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据。
持久化级别

级别 使用的空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分 如果数据在内存中放不下,则溢写到磁盘上,在内存中存放序列化后的数据
DISK_ONLY

堆外缓存是试验性功能,我们使用Tachyon(http://tachyon-project.org/)作为外部系统,有兴趣可以参考,关于如何在Tachyon上运行Spark的介绍(http://tachyon-project.org/Running-Spark-on-Tachyon.html)。

import org.apache.spark.storage.StorageLevel
val result=input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

注意,我们在第一次对这个RDD调用行动操作前就调用了persist()方法,persist()调用本身不会触发强制求值。如果要缓存的数据太多,内存中放不下,Spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘。不论哪一种情况,都不必担心作业因为缓存了套多数据而被打断。不过,缓存不必要的数据会导致有用的数据被移出内存,带来更多重算的时间开销。最后RDD还有一个方法叫做unpersist(),调用该方法可以手动把持久化的RDD从缓存中移除。

猜你喜欢

转载自blog.csdn.net/wanchaochaochao/article/details/84728705