Spark零基础实战 第15章 MasterHA工作原理解密

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/myvanguard/article/details/86653922

第15章 MasterHA工作原理解密

这一章介绍Spark MasterHA的4种策略以及着重解密如何通过ZooKEEPER这种企业经常使用的策略来保证Spark Master HA。

15.1 Spark 需要 Master HA的原因

为保证集群高度的稳定性和容错性。为此Spark提供了集群的不同的重启策略,保证系统的正常运行,其中相应的策略包含如下几种方式。
①ZooKEEPER:企业经常使用,实时恢复。
②FILESYSTEM:低延迟恢复。
③CUSTOM:自定义恢复。
④NONE:不采用HA,默认就是Master/Slaves。
具体采用哪种恢复策略,可以通过spark-env.sh的RECOVERY_MODE属性进行设置。

15.2 Spark Master HA的实现

(1)ZOOKEEPER实现解密
ZOOKEEPER会自动化管理Masters的切换,整个集群运行时的元数据(包括Workers、Drivers、Applications、Executors)会被保存在ZOOKEEPER中,ZOOKEEPER集群中的当前状态为leader的是目前正在工作的,而spark集群中状态为Active的Master管理着当前的集群。
在这里插入图片描述
(2)FILESYSTEM实现解密
FIL.ESYSTEM的方式会创建FileSystemRecoveryModeFactory,在其内部会根据spark.deploy.recoveryDirectory所设置的文件目录来创建目录,将整个集群运行时的元数据保存在该目录下。同时FileSystemRecoveryModeFactory中还会创建文件系统的持久化引擎(FileSystemPersistenceEngine),该持久化引擎中提供了persist(将数据持久化到磁盘文件)、unpersist(将文件从磁盘上移除)、read(从磁盘上读取数据)、以及序列化和反序列化等方法。集群恢复时会通过readPersistedData方法,读取持久化在磁盘的元数据进行集群恢复。

(3)自定义CUSTOM实现解密
CUSTOM的方式允许用户自定义MasterHA的实现,这对于高级用户特别有用。在Spark中该方式被设计成可插拔式的,使用该方式同样要实现StandaloneRecoveryModeFac-
tory 抽象类以及相应的持久化引擎。

(4)NONE实现解密
NONE策略是Spark默认采用的方式,该方式只会将在集群运行时的元数据保存到临时文件中,不会持久化,重启的时候,所有的元数据信息将会丢失。

15.3 Spark 和ZOOKEEPER的协同工作机制

ZOOKEEPER的两大主要流程:选举leader和同步数据。
(1)ZOOKEEPER对Spark的管理

对于ZOOKEEPER对Spark的管理,如图15-2所示。首先在ZOOKEEPER集群中会有一个处于leader状态的ZOOKEEPER服务器处于工作状态,ZOOKEEPER会从状态为STANDBY状态的Master中选举出一个Master,该Master会通过ZOOKEEPER持久化引擎(ZooKeeperPersistenceEngine)去ZOOKEEPER中读取Spark集群的元数据信息(包括Drivers、Applitions、Workers、Excutors)。
在这里插入图片描述
Master对读取到的信息进行判断,如果有相关的信息,那么将相应的信息重新注册到该Master中。然后要进行一致性的校验,即所获取的信息是否和集群中的信息是一致的。

验证的时候首先会将Applications和Workers等状态标记为UNK)WN,然后会向Applications和Workers 发送现在被选举出来的Master的信息(MasterChanged其中包括Master的引用masterRef和masterWebUiUrl)。此时接收到信息的Applications和Workers会通过masterRef发送消息(WorkerSchedulerStateResponse其中包括workerld、executors、driv-
ers)给Master。

Master接收到来自Applications和Workers后,如果是Worker,那么Master会根据传递过来的workerld的内存(workerld)中的worker,并将其state设置成ALIVE,同时会将合法的Executor 信息重新添加到Worker。如果是Drivers,会判断driverld在Master的内存中是否存在,若存在会将driverld对应的Driver的状态设置成RUNNING,并将该driver信息记录到worker中。最终会调用completeRecovery方法来处理没有响应的Drivers、Workers 和Applications。处理完成后,会将被选举出来的Master的state设置成RecoveryState.ALIVE。

最后Master会调用schedule方法调度当前可以使用的资源给正在等待的Applications。

(2)通过ZOOKEEPER选举LeaderElection 机制是ZOOKEEPER中重要的功能,默认的实现算法是FastleaderElection。

扫描二维码关注公众号,回复: 5097511 查看本文章

如图15-3所示,ZOOKEEPER在刚启动的时候,每个节点都会读取myid中的数据(对应的id),读取完数据之后,会向其他的服务器发送自己选举的Leader(一般推荐自己),同时接受其他服务器发来的信息(这个时候还不知道谁是leader),更新相关的选举信息,该信息会不断地更新,最后以逻辑时钟最大为准,这样每台服务器中得到相同的选举结果。在执行选举过程之后,会产生leader,其他的节点根据得到的选举信息,尝试连接Leader。
在这里插入图片描述
(3)通过ZOOKEEPER恢复
在ZOOKEEPER服务器之间存在心跳机制,该机制主要用来同步各个节点之间存储的集群元数据信息。这种机制保证了leader节点和Follower节点之间数据的一致性。当leader节点遇到故障时,会再次执行选举过程,选出新的Leader,而新的节点中也保存的数据是也最新的。
Spark中的Master会从新的Leader中读取元数据进行恢复。

15.4ZOOKEEPER实现应用实战

(1)添加Spark的ZOOKEEPER支持
如果要使用ZOOKEEPER来对管理Spark集群中的Master,需要添加在SparkEnv中添加ZOOKEEPER支持,将Master的恢复策略(Dspark.deploy.recoveryMode)设置成ZZOOKEEPER,同时指定Master,去掉SPARK MASTER_IP的设置。具体配置如下。


#主节点的IP地址
#export SPARK_MASTER_IP=master
#配置spark的zookeeper支持信息
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=master:2182,worker1:2182,worker2:2182
-Dspark.deploy.zookeeper.dir=/data/zookeeper/spark"

(2)ZOOKEEPER的安装及配置
ZOOKEEPER的配置文件位于conf下,默认的有个配置实例文件zoosample.cfg。该文件拷贝一份重命名为zoocfg。打开文件并对相关的属性进行配置。具体配置如下。

# The number of miliseconds of each tick 
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take 
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement 
syncLimit=5
# the directory where the snapshot is stored.
# do not use/tmp for storage,/tmp here is just
#example sakes.
dataDir=/data/zookeeper/data 
dataLogDir=/data/zookeeper/log
#the port at which the clients will connect 
clientPort=2182

initLimit:初始化连接时,follower和leader之间的最长心跳时间。。
syncl.imit:该参数配置leader和follower之间发送消息,请求和应答的最大时间长度。
dataDir:数据存放目录。
datal.ogDir:日志数据存放路径。
server.X:其中X是一个数字,表示这是第几号server,X是该server所在的IP地址。

配置ZOOKEEPER的环境变量,在/etc/profile在文件的末尾添加如下内容。

export ZO0KEEPER HOME=/usr/app/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

注意:dataDir和datal.ogDir要手动创建目录保存数据
日志信息的配置的conf/log4j.properties。
启动测试:./zkServer.sh status

(3)使用ZooKEPPER实现HA
使用ZOOKEEPER实现Spark集群的HA,目前采用三台ZOOKEEPER服务器,三台Master。ZOOKEEPER集群的安装需要在配置文件中指定所有的服务器地址,配置如下。

# The number of milliseconds of each tick tick 
Time=2000
# The number of ticks that the initial
# synchronization phase can take 
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement 
syncLimit=5
# the directory where the snapshot is stored.
# do not use/tmp for storage,/tmp here is just
# example sakes.
dataDir=/data/zookeeper/data
#日志数据
dataLogDir=/data/zookeeper/log
#the port at which the clients will connect
clientPort=2182
#集群环境
server.0=master:2888:3888
server.1=worker1:2888:3888
server.2=worker2:2888:3888

配置完成后需要将该机器的配置拷贝到其他机器上,使用scp命令将ZOOKEEPER的安装目录、环境变量配置以及工作目录拷贝到其他的节点上。
①拷贝安装目录scp-r/usr/app/zookeeper root@worker1:/usr/app/zookeeper。
②拷贝环境变量配置scp/etc/profile root@worker1:/etc/profile。
③拷贝工作目录scp-r/data/zookeeper root@worker1:/data/zookeeper。
最后修改myid文件。
到/data/zookeeper/myid(如果没有可以手动创建)里面修改server的id,在修改id的时候要保证整个集群中的id不重复。
至此,ZOOKEEPER的环境已安装完成,接下来将Spark集群的恢复策略设置成ZOOKEEPER集群,需要在SparkEnv中添加如下配置。

export SPARK_DAEMON-JAVA_OPTS="Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper,url=Master:2181,Worker1:2181,Worker2:21813.
-Dspark.deploy.zookeeper.dir-/spark"

(4)启动集群
要启动集群首先要到每台ZOOKEEPER上去启动相应的服务,依次到master、worker1、worker2上启动ZOOKEEPER。

如果此时使用zkServer.shstatus查看状态时,会显示如下信息。

zkServer.sh start
zkServer.sh status

启动Spark的集群,同样到其他两台机器上手动启动Master。启动结束后,可以通过浏览器查看Master的状态(访问地址http://IP:8080/)。

(5)Master HA测试
主要从两个方面对于集群进行测试,ZOOKEEPER集群和Spark集群。

ZOOKEEPER集群测试。测试过程中会采用手动关闭状态为leader状态的进程来模拟故障,首先观察下关闭之前各节点的状态。

Spark集群测试。通过手动关闭状态的Active的Master来观察各个节点上Master的状态。

猜你喜欢

转载自blog.csdn.net/myvanguard/article/details/86653922