Spark Cluster Mode
首先了解一些概念:
- Application #用户通过Spark构建的程序,由Driver程序和executors程序组成。
- Application jar #个人理解是包含了Application所依赖的其他包的jar
- Driver program #master,SC生成的运行在程序入口(main())之下的程序,个人理解Driver program是对用户代码封装,并由内部的程序将job分解为若干个阶段及tasks,然后交由Executors处理。
- Cluster manager #包括standalone Yarn等,作用是为executors分配资源。
- Worker node #集群中的结点
- Executor #worker,具体执行Application的程序,每个application都会分配自己的Executor
- Task #作为一个工作单元分发给Executors
- Job #Application要做的任务
- Stage #Job分为若干个Stage,和hadoop将任务分为map、reduce类似
Spark应用作为独立的程序集合运行在集群之上,SparkContext(下文称SC)将Spark应用程序组织起来。
当运行一个Spark集群时,SC可以连接在一些cluster manager上,这些cm包括Spark自己的集群管理器,以及Mesos和Yarn,cm的作用是为Spark应用分配资源。SC与cm连接后,Spark获得集群节点上的executors,这些执行器为应用提供计算和存储,接着,应用程序的代码在executors上运行。
这样的架构有如下特点:
- 应用间独立。每个应用有他们自己的executor 进程,executors以多线程的方式在应用进行过程与执行tasks。
这使得应用与应用之间隔离开来,无论是在Driver端还是executor端(Driver端生成自己的tasks,不同应用的task运行在不同的JVM上)。当然,这也意味着数据无法在应用间(SC实例)共享,除非将数据写到外部存储系统中。 - 不同的cm选择。 spark应用可以采用多种cm。只要cm可以满足为executor进程调度资源、进程通信等条件。
- driver的监听机制。 driver进程需要监听生命周期内的executors以及新的executors连接,所以driver进程要能找到executors。
- 资源本地化。 driver生成的tasks要尽可能的在近端worker nodes执行,最好是在本地节点运行这些tasks,这样会减少网络传输等开销。
Cluster Manager
在Spark中很多种cm选择,下面说下Spark on Yarn的个人理解。
-
HADOOP_CONF_DIR:首先要在环境变量中添加该参数,值为hadoop的目录。该参数用来将数据写入HDFS
以及连接YARN RM。因为YARN的地址都配置在Hadoop中,所以不需要在–master指定master的地址,而只需要指定–master yarn即可。 -
Spark on Yarn工作机制:
client模式:Driver作为SparkContext的载体,会启动DAG、Task程序,DAG将Spark作业分为多个阶段,Task程序将任务提交,外部看,Driver将任务提交到RM,RM会启动一个叫SparkExecutorLauncher的applicationMaster(am),am负责启动Yarn容器,在申请前,就会向RM申请资源,申请到资源后,am启动一个装有Executor的容器container(具体为一个Executor backend的进程)来运行Executor,Executor作为worker,并行地执行Spark作业。
cluster模式:和client大部分类似,不同之处在于Driver运行的位置,在client模式中Driver运行在客户端节点上,cluster模式中Driver运行在am中。
根据Driver的位置来看,client模式适合在开发阶段,在客户端节点上,也就是提交任务的节点上查看executor执行任务的情况;cluster模式适合在生产环境下,客户端提交任务后,不必关注与任务的运行情况,executor返回的任务执行情况会集中写到日志中。 -
提交任务:
$ ./bin/spark-submit
--class org.apache.spark.examples.SparkPi \ #程序入口
--master yarn \ #指定CM
--deploy-mode cluster \ #指定client还是cluster
--driver-memory 4g \ #driver内存
--executor-memory 2g \ #每个executor内存
--executor-cores 1 \ #每个executor的CPU核数
--queue thequeue \ #YARN的队列名称
examples/jars/spark-examples*.jar \ #要运行的application
10 #application的参数
以上命令启动了一个YARN客户端程序,该程序启动默认的application master。client周期地更新AM状态,并将信息展示到控制台上。等到用户程序运行结束,client也将退出。
当spark以cluster模式运行时,由于driver程序运行在不同的机器上,所以不能加载本地的jar,可以用命令–jars指定需要添加的jar包:
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars $(echo $HBASE_HOME/lib/*.jar | tr ' ' ',') \ #该jar包就是spark与hbase整合时需要的jar包
my-main-jar.jar \
app_arg1 app_arg2
- YARN 日志
在YARN中,executors 和 AM 都运行在容器containers内部,任务完成后,YARN有两种方处理日志,日志聚合或不聚合。
聚合:
#该参数是配置YARN日志聚合的,如果开启,那么容器日志会拷贝到HDFS中并且在本地删除日志。
yarn.log-aggregation-enable
#这些日志可以由以下命令查看
yarn logs -applicationId <app ID>
#当然也可以从HDFS中直接容器日志,日志目录部署位置在以下参数
yarn.nodemanager.remote-app-log-dir
yarn.nodemanager.remote-app-log-dir-suffix
#日志还可以在WEB UI查看,位置在Executors Tab,前提是在yarn-site.xml配置以下参数
yarn.log.server.url
不聚合:
#如果参数yarn.log-aggregation-enable没有打开,那么日志将会存在本地机器,存储目录由以下参数指定,通常部署在/tmp/logs or $HADOOP_HOME/logs/userlogs,取决于Hadoop的版本及部署情况。
YARN_APP_LOGS_DIR
#这些日志文件的子目录由application ID and container ID组成
#这些日志也可以在WEB UI的Executors Tab查看
#如果想要查看每个container的环境信息,增加yarn.nodemanager.delete.debug-delay-sec (e.g. 36000)
yarn.nodemanager.delete.debug-delay-sec
#接着通过以下参数的目录去访问容器的环境信息,环境信息包括脚本代码,jar包,环境变量等。
yarn.nodemanager.local-dirs
也可以和log4j整合,具体以后再研究。
- Spark on YARN 的参数配置
通过配置文件$SPARK_CONF_DIR/metrics.properties中的配置,可以改变Spark on YARN模式下的am及executors参数,它会自动更新配置文件,而不用显示地使用–files命令指定该文件。
spark.yarn.am.memory 512m #client模式下application master内存大小
spark.driver.memory 512m #cluster模式下application master内存大小
spark.yarn.am.cores 1 #client模式下am核数
spark.driver.cores 1 #cluster模式下am核数
spark.yarn.am.waitTime 100s #only cluster mode,am等待SC初始化的超时时间
#具体参数配置有时间再研究
#http://spark.apache.org/docs/latest/running-on-yarn.html