(一)SparkContext
代表对集群的一个连接
驱动程序通过sparkContext连接spark集群
每个JVM只允许启动一个SparkContext,一个sparkContext连接代表一个application,每个application中可能有多个job(jobid)
yarn上面的一个application如下:
代码中初始化sparkContext的方式如下:
SparkConf conf = new SparkConf().setAppName(appName).setMaster("local").setJars(new String[]{"wangke-demo-1.0.jar"});
JavaSparkContext sc = new JavaSparkContext(conf);
注意:程序里的setMaster,.setJars可以不写,而是通过参数的方式指定,参数的方式和程序里面指定效果是一样的。但参数方式更加灵活,不在程序中写死,而是在提交应用时,根据spark集群部署的情况(standalone,yarn,mesos)再指定这些参数。如下:
bin/spark-submit --class com.wangke.sparkDemo.SparkRDDSQLDemo --master yarn-client --executor-memory 2G program/wangke-demo-1.0.jar
(二)、Job提交过程
提交spark-submit,一个JVM中只能有一个sparkContext,就生成一个application
此时开始分配资源,Executor就注册到driver上,建立联系了。Driver和Executors信息交互的方式:Actor Model 及实现 Akka。
提交和分配资源这步与部署方式有关(local,standalone,yarn,mesos),下一章再详细讨论。在资源分配结束之后,基本上就是Driver与Executor交互的过程了。
It’s all about Driver and Executors!
每个application只能有一个sparkContext,一个sparkContext可以提交多个sparkJob,可以在一个线程中顺序提交SparkJob(阻塞),也可以多线程的并发提交SparkJob
对于一般的SparkJob作业的执行:
1. 一个Action触发一个job
2. 每个action都会通过sparkContext调用DagScheduler 的runJob方法向集群提请求
3. DagScheduler 分析依赖 生成DAG图(有向无环图)(一个job对应一个DAG)
4. 根据DAG将job分割为多个stage
生成对应的task(shuffleMapTask和ResultTask),Stage里面的生成的task数目由partition数决定
5. 将stage下的task集发提交给TaskScheduler
6. TaskScheduler分发task到已经注册在driver节点的Executor上执行
7. 结果返回到Driver
It’s all about Driver and Executors!(重要的话说几遍)
(三)、Spark提交Job的顺序的小实验
在上面我们说了一个线程中顺序提交SparkJob(阻塞),也可以多线程的并发提交SparkJob,我们可以验证一下:
例一:sparkSQL
测试发现sqlContext.sql是同步的,它会提交job,直到job结束才会往后走
上面的sql执行完了之后才会执行cancelJobGroup
例二:Dataframe
Hql方法是阻塞的
Repartition.Write也是阻塞
在前后日志打时间就可以看到
第二步的写操作结束之后才进行第三步的读取,(这里,第三步读取的目录是第二步生成的,所以如果是异步的会出问题,一定是同步顺着执行下来的,不会出现df.repartition.write还在写,还没写完。Hc.read()就去读了。一定是前面一步(job1)写完了,后面的(job2)才去读)
例三:sparkStreaming
sparkStreaming也是只有一个sparkContext,但它的streaming的job可以都提交上去,比如前面2s的job未执行完,新的2s的job也会提交上去,所以可以看到多个job在上面。原因就在于它是采取多线程的方式提交的(仅仅作者的猜想,有待验证)
上述说的阻塞是这样的一个意思
Main中:
1. Int a=3
2. Int b=4
3. RDD.map....
4. System.out.print()
5. RDD.action1
6. RDD.action2
在driver上会先执行1,2
3是transformation,是lazy的,所以暂时不执行
执行4
然后5触发了一个job的提交
然后这个sc的此线程就阻塞在这里等这个job结束
5的job全部结束之后
6再执行,再触发job2的提交
sparkContext提交job,在一个线程中,它是阻塞的(调用DAGScheduler的runJob[阻塞方法],runJob负责调用submitJob[异步])
如果想并行提交job,使用多线程去提交job