一、RDD
Spakr使用RDD实现数据存储。从物理上来讲,RDD将一个大的数据集分块为一系列数组,这些数组以分布式的方式存储在集群中的各个节点上,每个数据块有一个标识,称为blockID。每个数据块可以存储在节点的内存中,也可以被持久化在硬盘上。为了便于组织和处理,这些数据块在逻辑上进行划分后,形成多个分区(partition)。
二、算子
算子是Spark中定义的函数,用于对RDD中的数据进行操作和转换,Spark中的算子可以分为4类:
(1)创建算子(Creation),用于将内存中的集合或外部文件创建为RDD对象;
(2)变换算子(Transformation),用于将一个RDD转换为另一个RDD;
(3)缓存算子(Cache),用于将RDD缓存在磁盘或内存中,以便后续计算重复使用;
(4)行动算子(Action),会触发Spark作业执行,并将计算结果RDD保存为Scala集合或保存到外部文件或数据库中;
例子:基于Spark的WordCount程序
object SimpleApp { def main(args: Array[String]) { System.setProperty("hadoop.home.dir", "C:\\hadoop"); if (args.length != 2) { println("error : too few arguments"); sys.exit(); } val inputFile = args(0); //读取输入文件路径 val outputFile = args(1); //读取输出文件路径 //创建Spark程序运行需要的配置实例,其中设置应用名称为“Simple Application”,并采用本地方式运行 val conf = new SparkConf().setAppName("Simple Application").setMaster("local"); /** * 生成SparkContext实例,SparkContext是整个Spark程序的入口, * 负责向集群申请程序运行的环境和资源,以及进行必要的初始化工作 */ val sc = new SparkContext(conf); //读取文件内容,生成RDD val file = sc.textFile(inputFile, 3); //统计 val counts = file.flatMap { line => line.split(" ") }.map { word => (word, 1) }.reduceByKey(_ + _); //保存统计结果 counts.saveAsTextFile(outputFile); } }
下图展示了WordCount的运行过程:
textFile是一个创建算子,功能是逐行读取文本数据,并转化为Spark中的HadoopRDD对象实例。参数“3”是指生成的HadoopRDD对象实例,以3个分区(Partition)的方式进行存储。
Spark中的分区只是一个逻辑上的概念,在Spark集群上数据的真正存储单元被称为数据块(Block),由数据管理器(BlockManager)维护和管理。逻辑概念上的分区和物理上的数据块一一对应,即一个分区对应一个数据块。
每个Block都有唯一的BlockID,其构成方式是"rdd_"+rddID+"_"+partitionID,其中rddID是该Block所属的RDD的标识。BlockManager依据BlockID记录每个数据块的存储结点位置等元数据,以对这些Block进行管理。
flatMap()、map { word => (word, 1) }、reduceByKey(_ + _)都是变换算子。
saveAsTextFile是行动算子,功能是将数据保存到文件中。
另外需要记住两个重要理念:
(1)在Spark中,只支持对RDD进行粗粒度的操作,不提供对RDD中某个元素进行操作的函数接口。
(2)在Spark中,RDD是只读的。