Spark基础入门(三)--------作业执行方式

(一)SparkContext

代表对集群的一个连接

驱动程序通过sparkContext连接spark集群

每个JVM只允许启动一个SparkContext,一个sparkContext连接代表一个application,每个application中可能有多个jobjobid

yarn上面的一个application如下:


代码中初始化sparkContext的方式如下:

SparkConf conf = new SparkConf().setAppName(appName).setMaster("local").setJars(new String[]{"wangke-demo-1.0.jar"});

JavaSparkContext sc = new JavaSparkContext(conf);

注意:程序里的setMaster,.setJars可以不写,而是通过参数的方式指定,参数的方式和程序里面指定效果是一样的。但参数方式更加灵活,不在程序中写死,而是在提交应用时,根据spark集群部署的情况(standaloneyarnmesos)再指定这些参数。如下:

bin/spark-submit --class com.wangke.sparkDemo.SparkRDDSQLDemo  --master yarn-client  --executor-memory 2G  program/wangke-demo-1.0.jar


(二)、Job提交过程

提交spark-submit,一个JVM中只能有一个sparkContext,就生成一个application

此时开始分配资源,Executor就注册到driver上,建立联系了。DriverExecutors信息交互的方式:Actor Model  及实现 Akka

提交和分配资源这步与部署方式有关(local,standaloneyarnmesos),下一章再详细讨论。在资源分配结束之后,基本上就是Driver与Executor交互的过程了。

It’s all about Driver and Executors!

 

每个application只能有一个sparkContext,一个sparkContext可以提交多个sparkJob,可以在一个线程中顺序提交SparkJob(阻塞),也可以多线程的并发提交SparkJob

 

对于一般的SparkJob作业的执行:

1. 一个Action触发一个job

2. 每个action都会通过sparkContext调用DagScheduler runJob方法向集群提请求

3. DagScheduler 分析依赖   生成DAG图(有向无环图)(一个job对应一个DAG

4. 根据DAG将job分割为多个stage

生成对应的task(shuffleMapTaskResultTask),Stage里面的生成的task数目由partition数决定

5. 将stage下的task集发提交给TaskScheduler

6. TaskScheduler分发task到已经注册在driver节点的Executor上执行

7. 结果返回到Driver

It’s all about Driver and Executors!(重要的话说几遍)


(三)、Spark提交Job的顺序的小实验

在上面我们说了一个线程中顺序提交SparkJob(阻塞),也可以多线程的并发提交SparkJob,我们可以验证一下:

例一:sparkSQL


测试发现sqlContext.sql是同步的,它会提交job,直到job结束才会往后走

上面的sql执行完了之后才会执行cancelJobGroup

 

例二:Dataframe


Hql方法是阻塞的

Repartition.Write也是阻塞

在前后日志打时间就可以看到

第二步的写操作结束之后才进行第三步的读取,(这里,第三步读取的目录是第二步生成的,所以如果是异步的会出问题,一定是同步顺着执行下来的,不会出现df.repartition.write还在写,还没写完。Hc.read()就去读了。一定是前面一步(job1)写完了,后面的(job2)才去读)


例三:sparkStreaming

sparkStreaming也是只有一个sparkContext,但它的streamingjob可以都提交上去,比如前面2sjob未执行完,新的2sjob也会提交上去,所以可以看到多个job在上面。原因就在于它是采取多线程的方式提交的(仅仅作者的猜想,有待验证)



上述说的阻塞是这样的一个意思

Main中:

1.   Int a=3

2.   Int b=4

3.   RDD.map....

4.   System.out.print()

5.   RDD.action1

6.   RDD.action2


driver上会先执行1,2

3transformation,是lazy的,所以暂时不执行

执行4

然后5触发了一个job的提交

然后这个sc的此线程就阻塞在这里等这个job结束

5job全部结束之后

6再执行,再触发job2的提交

sparkContext提交job,在一个线程中,它是阻塞的(调用DAGSchedulerrunJob[阻塞方法]runJob负责调用submitJob[异步]

如果想并行提交job,使用多线程去提交job



猜你喜欢

转载自blog.csdn.net/silviakafka/article/details/54576754