Spark之——Spark Submit提交应用程序

本部分 来源,也可以到 spark官网查看英文版。
spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如 *.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有统一的接口,不用太多的设置。

使用spark-submit时,应用程序的jar包以及通过—jars选项包含的任意jar文件都会被自动传到集群中。

spark-submit --class   --master  --jars 

1、绑定应用程序依赖

如果代码依赖于其它项目,为了将代码分发到Spark集群,就需要将这些依赖一起打包到应用程序中去。sbt和Maven都有装配插件,只要在创建集成的jar时列出Spark和Hadoop需要的依赖,而不需要将这些依赖和应用打包,因为在程序运行的时候集群的master知道如何调用和提供这些依赖;但是一旦有集成好的jar包,在执行bin/spark-submit脚本时就坐传递这些jar包了。

对于Python语言来讲,可以使用spark-submit的–py-files参数添加.py,.zip,.egg文件和应用程序一起进行分发,如果应用程序依赖于多个Python文件,建议将它们打包成.zip或.egg文件。

2、用spark-submit启动应用程序

如果打包了应用程序,就可以使用bin/spark-submit脚本启动应用程序了,这个脚本可以设置Spark类路径(classpath)和应用程序依赖包,并且可以设置不同的Spark所支持的集群管理和部署模式。提交任务后,无论是Standalone模式还是Spark on Yarn模式,都可以通过Web地址http://:4040来查看当前运行状态(具体访问地址需要查看spark搭建时的配置文件)。
spark-submit提交应用的大致格式如下:

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
用得较多的参数是:
--class:应用程序的入口点(例如,org.apache.spark.examples.SparkPi)
--master:集群的master URL(例如,spark://localhost:7077)
--deploy-mode:将driver部署到worker节点(cluster模式)或者作为外部客户端部署到本地(client模式),默认情况下是client模式
--conf:用key=value格式强制指定Spark配置属性,用引号括起来
--application-jar:包含应用程序和所有依赖的jar包的路径,路径必须是在集群中是全局可见的,例如,hdfs://路径或者file://路径
--application-arguments:传递给主类中main函数的参数

一般的部署策略是在一个网关机器上提交应用程序,这个机器和Worker机器部署在一个网络中(例如,Standalone模式的EC2集群中的Master节点)。在此部署策略中,client模式更为合适,client模式中的driver直接跟spark-submit进程一起启动,spark-submit进程在此扮演集群中一个client的角色。应用程序的输入输出依赖于控制台,如此一来,这种模式就特别适合关于REPL(例如,Spark shell)的应用程序。
另一种部署策略是,应用程序通过一台远离Worker节点的机器提交(例如,本地或者便携设备),这种情况下,一般使用cluster模式最小化drivers和executors之间的网络延时。注意,cluster模式暂时不支持于Mesos集群或Python应用程序。

Python应用程序中,简单地在application-jar处传递一个.py文件而不是JAR文件,然后用–py-files添加Python.zip,.egg或者.py文件到搜索路径。

还有一些集群管理器正在使用的可选项。例如,对于Spark Standalone的cluster部署模式,也可以使用–supervise以确定driver在遇到非0(non-zero)退出码的错误时进行自动重启。
通过运行spark-submit上–help列出所有的可选项。以下是一些常用选项的例子:

# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \  # can also be`yarn-client` for client mode
--executor -memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000

3、Master URLs

传递给Spark的master url可以是以下任意格式之一:

master URL 意义
local 使用1个worker线程本地运行Spark(即完全没有并行化)
local[K] 使用K个worker线程本地运行Spark(最好将K设置为机器的CPU核数)
local[*] 根据机器的CPU逻辑核数,尽可能多地使用Worker线程
spark://HOST:PORT 连接到给定的Spark Standalone集群的Master,此端口必须是Master配置的端口,默认为7077
mesos://HOST:PORT 连接到给定的Mesos集群的Master,此端口必须是Master配置的端口,默认为5050。若Mesos集群使用ZooKeeper,则master URL使用mesos://zk://……
yarn-client 以client模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
yarn-cluster 以cluster模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得

4、从文件中加载配置

spark-submit脚本可以通过属性文件加载默认的Spark配置值并将其传递给应用程序。默认情况下会读取Spark目录中conf/spark-default.conf文件中的各配置项,详细信息参考“加载默认配置”。

加载默认配置可以取消spark-submit命令的某些参数选项。例如,如果设置了spark.master属性,那么可以直接省略 –master选项。一般情况下,直接使用SparkConf设置的属性值具有最高的优先级,然后是spark-submit命令中传递的选项,最后才是默认配置文件中的值。如果你不清楚配置选项是来自于哪里,可以运行spark-submit –verbose打印处更细粒度的调试信息。

5、高级依赖管理

当使用spark-submit时,应用程序的jar包以及由–jars选向给出的jar会自动上传到集群,由–jars给出的jar路径之间必做用逗号分隔,如果路径是个目录的话,–jars的设置无法起作用,必须详细到abc.jar。
Spark使用了下面的URL格式允许不同的jar包分发策略。
1、文件file方式:
绝对路径且file:/URIs是作为driver的HTTP文件服务器,且每个executor会从driver的HTTP服务器拉取文件;
2、hdfs方式:
http:,https:,ftp:,从这些给定的URI中拉取文件和JAR包;
3、本地local方式:
以local:/开始的URI应该是每个worker节点的本地文件,这意味着没有网络IO开销,并且推送或通过NFS/GlusterFS等共享到每个worker大文件/JAR文件或能很好的工作。

注意:SparkContext的JAR包和文件都会被复制到每个executor节点的工作目录下,这将用掉大量的空间,所以需要清理干净。
在YARN下,会自动清理。
在Spark Standalone下,可以通过配置spark.worker.cleanup.appDataTtl属性做到自动清理。
用户可以用--packages选项提供一个以逗号分隔的maven清单来包含任意其他依赖。
其它的库(或SBT中的resolvers)可以用--repositories选项添加(同样用逗号分隔),这些命令都可以用在pyspark,spark-shell和spark-submit中来包含一些Spark包。

对Python而言,–py-files选项可以用来向executors分发.egg,.zip和.py库。

6、在YARN集群上运行Spark应用

详细请参考官网
在Spark独立集群模式,–master 所提供的地址是由Spark自身给出,在yarn模式下,Spark资源管理器的地址其实也就是hadoop的master地址,因而–master 的地址应该由yarn来提供。
格式如下:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
下面是一个更详细的例子:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue thequeue \
    lib/spark-examples*.jar \
    10

执行的过程大致如下:
上面的应用提交后,会启动一个yarn客户端程序,这个程序接着启动master上的应用程序,SparkPi 再启动master应用的不同线程,yarn客户端程序会周期性地检查master应用的状态,并在控制台打印这些状态,程序结束后,yarn客户端会结束即出。我们可以通过查看驱动器和执行器的日志文件来调试应用程序。

先放在这里,有时间再来补上

7、运行 Spark Python 应用

主要是要解决如何将依赖库一起提交的问题。
方法一:将库以python文件的方式提交。
首先要理解的,我们在安装python外部库的时候,该库的功能主要还是靠abc.py这样的类文件来实现,当我们用类似from spark_learning.utils.default_utils import setDefaultEncoding的时候,是从这个类文件中引入了某个函数。所以当我们要将某个库提交给集群的时候,可以找到相关的类文件,然后将这个类文件提交上去。
可参考下面的格式(可能有错):

#python代码中的import
from spark_learning.utils.default_utils import setDefaultEncoding,initSparkContext,ensureOffset
#submit命令:
bin/spark-submit --jars /home/jabo/software/spark-1.5.2-bin-hadoop2.6/lib/spark-streaming-kafka-assembly_2.10-1.5.2.jar \
--py-files /home/jabo/spark-by-python/spark_learning/utils/default_utils.py \
/home/jabo/spark-by-python/spark_learning/third_day/streaming_kafka_avg.py

下面是比较正规的做法:
来源
用 Java 和 Scala 访问 Spark 提供了许多优点 : 因为 Spark 它自己运行在 JVM 中,运行在 JVM 内部是平台无关的,独立的代码包和它打入到 JAR 文件中的依赖,以及更高的性能。如果您使用 Spark Python API 将会失去这些优势。

管理依赖并让它们可用于群集上的 Python Job 是很难的。为了确定哪些依赖在群集上是需要的,您必须了解运行在分布式 群集 中的 Spark executor 进程中的 Spark 应用程序的代码。如果您定义的 Python 转换使用的任何的第三方库,比如 NumPy 或者 nltk,当它们运行在远程的 executor 上时 Spark executor 需要访问这些库。

7.1、独立的依赖关系

常见的情况中,一个自定义的 Python 包包含了你想要应用到每个 RDD 元素的 功能。下面展示了一个简单的例子 :

def import_my_special_package(x):
  import my.special.package
  return x

int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x))
int_rdd.collect()

您创建了一个有 4 个元素名为 int_rdd 的简单的 RDD。然后您应用函数 import_my_special_package 到每个 int_rdd 的元素。这个函数导入了 my.sepcial.package 并且返回了传入的原始参数。这样和使用类或者定义的函数有相同的作用,因为 Spark 需要每个 Spark executor 在需要时导入 my.special.package。

如果你只需要 my.special.package 内部一个简单的文件,您可以通过在您的 spark-submit 命令中使用 –py-files 选项并指定文件的路径来直接让 Spark 的所有 executor 都可以获取。您也可以以编程的方式通过使用 sc.addPyFiles() 函数来指定。如果您使用的功能来自跨越多个文件的 package,为 package 制作一个 egg,因为 –py-files 标记也接受一个 egg 文件的路径。
如果您有一个独立的依赖关系,您可以使用两种方式让需要的 Python 依赖是可用的。

  • 如果您只依赖一个单独的文件,您可以使用 –py-files 命令行选项,或者以编程的方式用 sc.addPyFiles(path) 兵指定本地 Python 文件的路径添加这些文件到 SparkContext。
  • 如果您有一个独立模块上的依赖(一个没有其它依赖的模块),您可以创建一个这些模块的 egg 或者 zip 文件,使用 –py-files 命令行选项或者以编程的方式用 sc.addPyFiles(path) 兵指定本地 Python 文件的路径添加这些文件到 SparkContext。

7.2、复杂的依赖关系

一些操作依赖有许多依赖关系的复杂的 package。例如,下列的代码片段导入了 Python pandas 数据分析库 :

def import_pandas(x):
 import pandas
 return x

int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_pandas(x))
int_rdd.collect()

pandas 依赖 NumPy,SciPi,和许多其它的 package。尽管 pandas 作为一个 *.py 文件来分发是很复杂的,您可以为它和它的依赖建一个 egg 并发送它们到 executor。

7.3、分发 Egg 文件的限制

在这两个独立的,复杂的依赖关系的情况中,发送 egg 文件是有问题的,因为带有原代码包必须在其上运行的特定主机进行编译。当行业业标准的硬件做分布式计算,你必须假设的是,硬件是多样化的。然而,由于所需的 C 编译,内置客户端主机上一个 Python egg 是特定的客户端的 CPU 架构。因此,分配复杂的 egg,像编译 NumPy,SciPy 的 package 和 pandas 经常出现故障。相反,分发 egg 文件,你应该在群集的每个主机上安装所需的 Python 包,并指定 Python 的二进制文件的路径为 worker 主机使用。
安装以及保持 Python 环境
安装和维护的Python环境可能比较复杂,但可以让你使用完整的Python包生态系统。系统管理员在用您需要的依赖在群集的每台主机上安装 Anaconda distribution 或者设置一个 虚拟环境。
如果您使用 Cloudera Manager,您可以使用方法将 Anaconda distribution 作为一个 Parcel 来部署。

最低需要的角色 : 群集管理员(或者管理员)

  • 添加下面的 URL https://repo.continuum.io/pkgs/misc/parcels/ 到远程的 Parcel 仓库 URL,像 Parcel 配置设置 中描述的一样。
  • 像 管理 Parcels 中描述的一样下载,分发,并且激活 Parcel。

Anaconda 被安装在 parcel 目录/Anaconda 中,parcel 目录默认是 /opt/cloudera/parcels,但是可以在 parcel 配置设置中更改。Anaconda parcel 支持 Continuum Analytics。

如果您没有使用 Cloudera Manager,您可以在您的群集中安装一个虚拟环境通过在每台主机上运行命令 Cluster SSH,Parallel SSH,或者 Fabric。假设每台主机已经安装了 Python 和 pip,在一个 RHEL 6 兼容的系统中上的虚拟环境中使用下面的命令来安装标准的数据栈(NumPy,SciPy,scikit-learn,和 pandas)。

# Install python-devel:
yum install python-devel

# Install non-Python dependencies required by SciPy that are not installed by default:
yum install atlas atlas-devel lapack-devel blas-devel

# install virtualenv:
pip install virtualenv

# create a new virtualenv:
virtualenv mynewenv

# activate the virtualenv:
source mynewenv/bin/activate

# install packages in mynewenv:
pip install numpy
pip install scipy
pip install scikit-learn
pip install pandas

7.4、设置 Python 路径

之后您想要使用的 Python 包在您群集中一致的位置,如下所示设置相应的环境变量路径到您的 Python 可执行的文件 :

  • Client 模式 - 用 PYSPARK_PYTHON 设置 executor 路径,用 PYSPARK_DRIVER_PYTHON 设置 driver 路径。
  • Cluster 模式 - 用 spark.yarn.appMasterEnv.PYSPARK_PYTHON 设置 executor 路径,用 spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON 设置 driver 路径。

为了使这些变量一致,添加相应的 export 语句 :

  • Anaconda parcel - export 变量 /opt/cloudera/parcels/Anaconda/bin/python
  • Virtual environment - export 变量 /path/to/mynewenv/bin/python

到 sparl-env.sh,检查其他用户没有用条件测试的变量,比如 :

if [ -z "${PYSPARK_PYTHON}" ]; then
export PYSPARK_PYTHON=
fi

在 Cloudera Manager 中,如下所示在 spark-env.sh 中设置环境变量 :

最低需求角色 : 配置员(也可以用群集管理员,管理员)

  • 转到 Spark 服务。
  • 点击 配置 标签。
  • 搜索 spark-conf/spark-env.sh 的 Spark 服务高级配置代码段(安全阀)。
  • 添加变量到属性中。
  • 点击 保存更改 以提交更改。
  • 重启服务。
  • 部署客户端配置。

在命令行中,在 /etc/spark/conf/spark-env.sh 中设置环境变量。

8、最终提交spark作业

更详细可参考文档

8.1、用yarn-client模式提交spark作业

在/usr/local/spark目录下创建文件夹

vi spark_pi.sh
shell文件的内容如下:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.JavaSparkPi \
--master yarn-client \
--num-executors 1 \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \

$SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar \
或者
[spark@master ~]$  $SPARK_HOME/bin/spark-submit  \
> --class org.apache.spark.examples.JavaSparkPi \
> --master yarn-cluster \
> --num-executors 1 \
> --driver-memory 1g \
> --executor-memory 1g \
> --executor-cores 1 \
>  $SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar
修改该shell文档权限
chmod 777 spark_pi.sh
然后运行以下代码就可启动应用程序
./spark_pi.sh

当然也可以在linux下设置定时器来定时运行该应用程序。

8.2、用yarn-cluster模式提交spark作业

在/usr/local/spark目录下创建文件夹

vi spark_pi.sh
shell文件的内容如下:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.JavaSparkPi \
--master yarn-client \
--num-executors 1 \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \

$SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar \
或者
[spark@master ~]$  $SPARK_HOME/bin/spark-submit  \
> --class org.apache.spark.examples.JavaSparkPi \
> --master yarn-cluster \
> --num-executors 1 \
> --driver-memory 1g \
> --executor-memory 1g \
> --executor-cores 1 \
>  $SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar
修改该shell文档权限
chmod 777 spark_pi.sh
然后运行以下代码就可启动应用程序
./spark_pi.sh
当然也可以在linux下设置定时器来定时运行该应用程序


猜你喜欢

转载自blog.csdn.net/l1028386804/article/details/80739399