简述Spark基础及架构
一、spark简介
spark是基于内存的分布式计算框架,特点是快速、易用、通用及多种运行模式。
-
快速:
基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
基于硬盘数据处理,比MR快10个数量级以上 -
易用:
支持Java、Scala、Python、R语言
交互式shell方便开发测试 -
通用性:
一栈式解决方案:批处理、交互式查询、实时流处理、图计算及机器学习 -
多种运行模式:
YARN、Mesos、EC2、Kubernetes、Standalone、Local
二、spark技术栈
- Spark Core: 核心组件,分布式计算引擎
- Spark SQL:: 高性能的基于Hadoop的SQL解决方案
- Spark Streaming: 可以实现高吞吐量、具备容错机制的准实时流处理系统
- Spark GraphX: 分布式图处理框架
- Spark MLlib: 构建在Spark上的分布式机器学习库
三、spark架构
spark架构主要由以下组件构成:
- Application: 建立在 Spark上的用户程序,包括 Driver代码和运行在集群各节点 Executor中的代码
- Driver program: 驱动程序, Application中的main函数并创建 SparkContext
- Cluster Manager : 在集群(Standalone、Mesos、YARN)上获取资源的外部服务
- Worker Node: 集群中任何可以运行 Application代码的节点
- Executor : 某个 Application运行在 worker节点上的一个进程
- Task: 被送到某个 Executor上的工作单元
- Job : 包含多个 Task 组成的并行计算,往往由 Spark Action算子 触发生成,一个 Application中往往会产生多个 Job
- Stage: 每个Job会被拆分成多组 Task,作为一个 TaskSet,其名称为 Stage
运行架构:
- 在驱动程序中,通过SparkContext主导应用的执行
- SparkContext可以连接不同类型的Cluster Manager(Standalone、YARN、Mesos),连接后,获得集群节点上的Executor
- 一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整
- 每个应用获取自己的Executor
- 每个Task处理一个RDD分区
四、saprk常用API
在开发过程中,常用API主要有: SparkContext、 SparkSession、 RDD、 DataSet及 DataFrame,本文主要介绍 SparkContext、 SparkSession。
4.1 SparkContext
- 是Spark的主入口
- 连接Driver与Spark Cluster(Workers)
- 每个JVM仅能有一个活跃的SparkContext
在IDEA中创建 SparkContext ,代码如下:
//导包
import org.apache.spark.{SparkConf, SparkContext}
//创建一个SparkContext对象
val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark")
val sc=SparkContext.getOrCreate(conf)
4.2 SparkSession
- SparkSession是 SparkSQL的入口 ,是在 2.0中引入的新的 API,旨在为 Spark编程提供统一的编程入口,SparkSession整合了 SparkConf、 SparkContext、SQLContext、 HiveContext以及 StreamingContext。
- 当创建了SparkSession对象后,可以间接拿到 sparkContext 与 sqlContext 对象,所以在 2.0版本后推荐使用 SparkSession作为编程入口。
- 在2.0之前的 Spark版本中, spark shell会自动创建一个 SparkContext对象(sc)
- 在2.0+版本中,Spark shell则会额外创建一个 SparkSession对象(spark),如下图:
在IDEA中创建 SparkSession ,代码如下:
//导包
import org.apache.spark.sql.SparkSession
//创建一个SparkSession对象
val spark = SparkSession.builder
.master("local[2]")
.appName("appName")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
五、spark数据核心–RDD
5.1 RDD概念
RDD称为 弹性分布式数据集( Resilient Distributed Datasets),它是一种分布式的内存抽象,允许在大型集群上执行基于内存的计算( In Memory Computing),为用户屏蔽了底层复杂的计算和映射环境。
弹性: 指在任何时候都能进行重算,这样当集群中的一台机器挂掉而导致存储在其上的RDD丢失后,Spark还可以重新计算出这部分的分区的数据
分布式: 数据计算分布于多节点
数据集: RDD并不存储真正的数据,只是对数据和操作的描述。它是只读的、分区记录的集合,每个分区分布在集群的不同节点上
简单来说,RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中,并执行正确的操作 。
更规范的解释是:
- RDD是用于数据转换的接口 ,比如 map、 filter、 groupBy、 join等。
- RDD指向了存储在 HDFS、 Cassandra、 HBase等、或缓存(内存、内存 +磁盘、仅磁盘等),或在故障或缓存收回时重新计算其他 RDD分区中的数据 。从
这个意义上讲, RDD不包含任何待处理数据。
5.2 RDD的五大特性
5.2.1 分区(Partition)
- RDD是由多个分区构成的( 使用RDD#partitions返回 RDD的所有分区信息),每个Partition都有一个唯一索引编号 (使用Partition#index访问)
- RDD分区概念与MapReduce的输入切片概念是类似的,对每个分区的运算会被一个当作一个Task执行。举例:如果有100个分区,那么RDD上有 n 个操作将会产生有 n*100 个任务。
5.2.2 compute函数
每个分区上都有compute函数,计算该分区中的数据
5.2.3 RDD依赖(DAG)
RDD有依赖性,通常情况下一个 RDD是来源于另一个 RDD,这个叫做 lineage。RDD会记录下这些依赖,方便容错。也称 DAG。
5.2.4 分区器(Partitioner)
只有 Key-Value 类型的 RDD才有分区器 ,可以传递一个自定义的 Partitioner 进行重新分区,非 Key-Value类型的 RDD(PairRDD)分区器的值是 None。
不同的 RDD的compute函数逻辑各不一样,比如:
- MapPartitionsRDD的compute是将用户的转换逻辑作用到指定的Partition上。因为 RDD的map算子产生MapPartitionsRDD,而 map算子的参数(具体操作逻辑)是变化的。
- HadoopRDD的compute是读取指定Partition数据。因为sc.hadoopFile(“path”)”读取 HDFS文件返回的RDD具体类型便是 HadoopRDD,所以只需要读取数据即可。
- CheckpointRDD的compute是直接读取检查点的数据。一旦 RDD进行checkpoint,将变成CheckpointRDD
5.2.5 分区优先位置列表
该列表存储了存取每个分区的优先位置 。对于一个 HDFS文件来说,这个列表保存了每个分区所在的数据块的位置。按照 “移动数据不如移动计算的” 的理念, Spark在进行任务调度的时候,会尽可能的将计算任务移动到所要处理的数据块的存储位置。
六、创建RDD
6.1 使用"集合"创建RDD
通过集合创建RDD有两种方法:parallelize与 makeRDD
makeRDD多一个重载方法:重载分配一系列本地Scala集合形成一个RDD,可以为每个集合对象创建一个分区,并指定优先位置便于在运行中优化调度。
使用本地集合创建RDD的问题在于:由于这种方法需要用到一台机器中集合的全部数据,所以这种方式在测试和原型构造之外很少使用,一般在测试时使用
使用 parallelize 创建RDD:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkContextDemo extends App {
//创建一个spark context对象
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkTest") //Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定
val sc: SparkContext = SparkContext.getOrCreate(conf)
//创建rdd1,不指定分区
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8))
println(rdd1.partitions.size) //控制台打印:2
//创建rdd2,指定分区:5
private val rdd2: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6),5)
println(rdd2.partitions.size) //控制台打印:5
//关闭资源
sc.stop()
}
6.2 通过"加载"创建RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object SparkSessionDemo extends App {
//创建一个spark session对象
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("sparkSessionTest")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
//加载本地文件
val distFile=sc.textFile("file:///val distFile=sc.textFile("file:///home/hadoop/data/hello.txt")")
//加载hdfs文件
val distHDFSFile=sc.textFile("hdfs://hadoop000:9000/hello.txt")
//关闭资源
sc.stop()
}
注:
- 加载“file://……”时,以local运行仅需一份本地文件,以Spark集群方式运行,应保证每个节点均有该文件的本地副本
- Spark默认访问HDFS,为HDFS文件的每一个数据块创建一个分区,也可以通过textFile()第二个参数指定,但只能比数据块数量多
- 加载路径支持目录、压缩文件以及通配符:
sc.textFile("/my/directory")
sc.textFile("/my/directory/*.txt")
sc.textFile("/my/directory/*.gz")
6.3 创建PairPDD
SparkContext.wholeTextFiles(): 可以针对一个目录中的大量小文件返回<filename,fileContent>作为PairRDD。Spark 为包含键值对类型的 RDD 提供了一些专有的操作,比如:reduceByKey()、groupByKey()……
也可以通过键值对集合创建PairRDD:sc.parallelize(List((1,2),(1,3)))
示例: IDEA src的data目录中有两个文件:hello.txt、test01.txt
内容如下:
创建一个pairPDD读取数据:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CreatRDDDemo extends App{
//创建一个spark context对象
val conf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkTest")
val sc:SparkContext = SparkContext.getOrCreate(conf)
//加载文件
val pairRDD:RDD[(String,String)] = sc.wholeTextFiles("file:///D:\\work\\date\\2020\\spark0904\\src\\data")
pairRDD.foreach(println)
sc.stop()
}
打印结果如下图: