1.Spark的任务执行流程
1wordcount执行流程介绍
- 注意:Spark中RDD会被拆分为多个Partition
- Spark集群的重要角色--注意:Spark的任务提交给集群去运行的时候会出现两个重要角色:driver和Executor
- master——Spark集群的主节点
- worker——Spark集群的工作节点
- Driver——和mapreduce作为整个APP的主控程序,负责这个任务运行的一切资源协调
- Executor——和mapreduce中task的角色和作用类似;其实就是存在于资源节点上的一个JVM进程,会并发运行多个线程,每一次当Executor在被初始化的时候都会初始化一个线程池
- 注意:mapreduce中task就是一个JVM进行,执行一个数据块,启动一个进程;但是spark中的Exector是一个JVM进行,对应一个进程,但是会并发运行多个线程,每个线程执行一个task,一个task就是一个数据块或者分区。
- Task——一个task就是一个数据块或者分区
- mapreduce和spark在运行程序时候的角色对比
- mapreduce 中的 MRAPPMaster YARNChild:MapTask和ReduceTask
- spark的主控程序叫做Driver,进程叫做Executor
- 共同点:向资源调度系统去申请资源来执行对应的task任务。
- hadoop各个版本和spark的作用
- mapreduce 1.x JobTracker TaskTracker 主从架构,管理资源和贡献资源的
- mapreduce 2.x Resourcemanager NodeManager 主从架构,也是管理资源和贡献资源的
- Spark Master Worker 主从架构,也是管理资源和贡献资源的
- 注意:分布式文件系统的区别
- Tachyon ALLuxio 基于内存的一个分布式文件系统 出现主要是为了给Spark做弥补
- HDFS S3 基于磁盘的一个分布式文件系统
- mapreduce和spark比较
- spark是基于内存的分布式计算引擎
- mapreduce是基于磁盘的分布式计算引擎
2.Spark 的核心功能
- 整个Spark Core 四大核心知识
- SparkContext
- 通常来说,DriverApplication的执行和输出都是通过SparkContext来完成的,在正式提交Application之前都需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文件服务、Web 服务等内容。所以初始化SparkContext的时间比较长。SparkContext初始化后会将程序初始化为有向无环图。
- SparkContext的核心组件
- DAGScheduler:负责创建Job,将DAG(有向无环图)中的RDD划分到不同的Stage,提交Stage等功能。
- TaskScheduler:一个应用程序会被划分为多个Stage,一个Stage会被划分为多个Task去执行;TaskScheduler主要负责资源的申请,任务的提交及请求集群对任务的调度工作。
- RDD
- 算子
- 任务的执行流程解析
- SparkContext
- 存储体系:Spark 优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘 I/O,提升了任务执行的效率,使得 Spark 适用于实时计算、流式计算等场景。此外,Spark 还提供了以内存为中心的高容错的分布式文件系统 Tachyon 供用户进行选择。
- Tachyon ALLuxio 基于内存的一个分布式文件系统 出现主要是为了给Spark做弥补
- BlockManager:负责管理和识别数据块的位置和调度,注意,也是属于SparkContext对象的成员变量
- 计算引擎:计算引擎由 SparkContext 中的 DAGScheduler、RDD 以及具体节点上的 Executor 负责执行的 Map 和 Reduce 任务组成。DAGScheduler在应用程序正式提交和执行之前会将RDD组织成有向无环图(简称DAG)、并对Stage进行划分。
- 有向无环图 DAG:一个应用程序会被划分为多个Stage(DAGScheduler完成),一个Stage会被多个task去执行(TaskScheduler完成)
- 部署模式:由于单节点不足以提供足够的存储及计算能力,所以作为大数据处理的 Spark 在SparkContext 的 TaskScheduler 组件中提供了对 Standalone 部署模式的实现和 YARN、Mesos等分布式资源管理系统的支持。通过使用 Standalone、YARN、Mesos、kubernetes、Cloud等集群部署模式为 Task 分配计算资源,提高任务的并发执行效率。此外Spark还提供了Local 模式和 local-cluster 模式便于开发和调试。
3.Spark 的扩展功能
- Spark SQL 既可以使用SQL语句还可以使用API操作。底层原理基本和Hive类似。Spark SQL 的过程可以总结为:首先使用 SQL 语句解析器(SqlParser)将 SQL 转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行的过程。
- Spark Streaming Spark Streaming与Apache Storm类似,也用于流式计算。Spark Streaming支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis 和简单的 TCP 套接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的接口规范。Dstream 是 Spark Streaming 中所有数据流的抽象,Dstream 可以被组织为 DStreamGraph。 Dstreams本质上由一系列的连续的RDD组成
- Spark GraphX Spark 提 供 的 分 布 式 图 计 算 框 架 。 GraphX 主 要 遵 循 整 体 同 步 并 行 计 算 模 式(BulkSynchronous Parallell,简称 BSP)下的 Pregel 模型实现。
- Spark MLlib Spark 提供的机器学习框架
- 机器学习的知识支撑:线性代数、概率论、高等数学、统计学 -- 对应算法:线性回归、逻辑回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型等等
4.Spark 核心概念
- Application ——应用程序就是一个Application,写在Spark上独立运行的程序
- Application jar ——Application打成的jar包
- Driver program——驱动程序
- cluster manager——集群管理器:实现方式由多种:StandAlone、YARN、
- deploy mode: 部署模式,重点是针对yarn来讲
- client: driver program 运行在client节点上 :最大的特点:client节点不能宕机
- cluster : driver program 运行在集群中的某个worker中 最大的好处:只要application提交成功之后就可以关闭客户端
- Worker node 从节点
- Executor
- Stage
问题汇总
- 一个appliction就只有一个shuffle算子参与吗?一个application就是一个job吗?
- 一个Application中,不一定只有一个action的算子,正常来讲,transformation的算子肯定会有很多个算子。
- 划分job的标准就是;从前往后寻找,每次遇到一个action的算子,就划分成一个job
- 一个appliction会按照action算子的数量,划分成多个job
- 一个job会按照是否有shuffle算子来划分成多个stage
- 一个stage会按照分区来划分成多个task
- 例如:wordcount 对应 一个application,一个job,两个stage
5.Spark的基本架构
从集群部署的角度来看,Spark集群有以下部分组成
- Cluster Manager ——Spark 的集群管理器,主要负责资源的分配和管理。目前,Standalone、YARN、Mesos、K8S,EC2 等都可以作为 Spark的集群管理器。
- Master 集群的主节点
- Worker 集群的工作节点,主要负责:创建 Executor,将资源和任务进一步分配给 Executor,同步 资源信息给 Cluster Manager。
- Executor:执行计算任务的一些进程。主要负责任务的执行以及与 Worker、Driver Application的信息同步。
- DriverApplication 客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换为RDD和DAG,并与Cluster Manager进行通信和调度。
整体关系图如下所示:
6.Spark的编程模型
- 获取程序入口,初始化一个程序入口对象。常见创建对象如:sparkContext, sqlContext/hiveContext, streamingContext,sparkSession
- 通过程序入口对象加载某个目录,形成为一个数据抽象对象如:RDD, DStreams,DataFrame DataSet
- 通过这个数据抽象对象,进行各种操作,来进行业务处理,得到结果数据,主要是进行Transformation算子和Action算子
- 处理结果数据:控制台打印println,collect,saveAsTextFile()
- 关闭应用程序入口,回收资源
wordcount代码实例如下:
package cn.mikeal.day07
import org.apache.spark.{SparkConf, SparkContext}
/**
* 实现WordCount对本地文件的操作
*/
object WordCountLocal {
def main(args: Array[String]): Unit = {
/**
* 第一步,获取编程入口对象
*/
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("WordCountLocal")
val sc = new SparkContext(conf)
// 如果没有传入SparkConf作为构造参数
/**
* 第二步,加载某个文件系统中的某个目录,得到一个数据抽象
*/
// textfile 跟mapreduce差不多,把整个文件的一行当做一个元素
// RDD就是这种元素的一个集合
val linesRDD = sc.textFile("d:/data/test/words.txt")
/**
* 第三步,对获取到的数据抽象进行各种业务处理
* 单词词频统计
* RDD[String] 由 中的每一行切割出来的所有单词组成的一个新的集合
*/
val wordRDD = linesRDD.flatMap((line:String) => line.split(" "))
val wordAndOneRDD = wordRDD.map((word:String) => (word,1))
// wordAndOneRDD.groupBy().reduce()
val wordsCountRDD = wordAndOneRDD.reduceByKey((a:Int,b:Int) => a + b)
/**
* 第四步,处理结果数据
*/
wordsCountRDD.foreach((x:(String,Int)) => println(x._1, x._2))
/**
* 关闭程序入口,回收资源
*/
sc.stop()
}
}