首先修改配置文件:
scp -r spark-2.3.1/ marshal05:$PWD
spark-env.sh
这里不设置端口也默认为7077
slaves,用于指定子节点。
将配置好的文件分发到其他机器上:
scp -r spark-2.3.1/ marshal05:$PWD
接下来我们就要用start-all.sh启动多台机器上的spark,他会在执行这个脚本的机器上启动一个Master,然后读取slaves文件,在对应的机器上通过ssh协议启动Worker
我们可以通过web页面查看集群的资源和情况。
这里的8080是spark集群的管理页面的端口。7077是Master和Worker进行RPC通信的端口。
-Dspark.deploy.recoveryMode这个参数的意思是,我的两个Master如果有一台出现问题了,恢复的时候应该去找Zookeeper
-Dspark.deploy.zookeeper.url是指定多个zookeeper的地址。因为zookeeper也有可能会发生故障。这里的url应该写主机名,如果我们改了zookeeper的地址的话,还需要在主机名后面加上:端口,如果不改的话,默认为2181。
-Dsaprk.deploy.zookeeper.dir,这个意思是我们要把spark的的信息写到zookeeper上,如果值为spark,则会创建 一个名为spark的目录,来保存spark的一些信息。
接下来我们重新修改配置文件:
首先修改spark-env.sh
export JAVA_HOME=/root/app/jdk1.8.0_171
#export SPARK_MASTER_HOST=marshal
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=marshal,marshal01,marshal02,marshal03,marshal04,marshal05 -Dspark.deploy.zookeeper.dir=/spark"
export SPARK_MASTER_PORT=7077
接下来我们希望marshal,marshal01作为Master,marshal02,marshal03,marshal04,marshal05作为Worker
所以我们需要修改slaves文件:
marshal02
marshal03
marshal04
marshal05
接下来我们将修改好的配置文件分发到其他机器:
接下来启动spark集群
我们在marshal执行命令:
sbin/start-all.sh
这样就会在marshal启动Master,在marshal02,marshal03,marshal04,marshal05启动Worker。此时集群还不是高可用的。所以我们还需要单独在marshal01启动Master,实现高可用
启动之后我们就可以在管理页面看到marshal的状态为active,而marshal01的状态为standby
此时我们查看zookeeper下的目录,我们可以看到,我们之前配置的存放spark数据的目录已经被创建。
我们配置完之后,就会想一个问题,现在我们所有的配置文件都没有指定Master在哪里。那么当Worker启动之后他们是怎么知道Master在哪里的?
因为Master启动之后,我们配置了连接zookeeper的操作,而Worker启动之后也会连接zookeeper。Master会把Master的地址告诉zookeeper,Worker连接zk之后可以通过zk间接的知道Master在哪。那以后Master就可以跟Worker进行通信了。
现在我们配置的这种spark不依赖于任何别的东西,叫做spark的stand alone模式。
现在我们想从hdfs中读取数据,然后在spark中完成计算。
现在我们先仅启动hdfs和zookeeper,不启动yarn。我们只从hdfs读取数据,然后用spark自带的stand alone来进行计算。
接下来我们启动spark集群。
之后我们就可以在任何一台机器上启动一个spark shell。一个spark shell相当于一个客户端。这个客户端是用来提交任务的。我们在哪台机器上提交都是可以的。
我们如果直接使用bin/spark-shell的话启动的是一个local模式。这样我们的任务就不会被提交到集群中。
所以我们还需要使用--master指定老大在哪里。以后连接上老大之后,老大会帮你分配任务。
然后还需要指定spark协议,端口
bin/spark-shell --master spark://marshal:7077
我们启动之后可以发现一些变化。我们可以在启动spark shell的机器上看到:
多了一个SparkSubmit,他相当于客户端连接到了Master。
同样的我们也可以在Worker的机器上看到一些变化:
多了一个CoarseGrainExecutorBackend,这个就相当于真正跑计算任务的进程。就相当于我们之前学的yarn,yarn中的nodemanager并不是真正负责计算任务的。而是他下面的yarnchild跑计算任务。
接下来我们读取hdfs中的文件,统计之后然后写到hdfs中。
sc.textFile("hdfs://marshal:9000/wordcount/input/word.txt")
.flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_)
.sortBy(_._2,false).saveAsTextFile("hdfs://marshal:9000/out123")
执行结果: