Spark 程序核心概念
* 首先,官网的这张图很好的描述了一个Spark 应用程序的结构,一个Spark应用程序由一个Driver端和多个Executor组成,多个task 以多线程的形式运行在Executor中。
* Driver端: Driver负责运行程序的main()方法,负责将代码发送到各个Executor去执行,然后取回结果,Driver端也是以Executor方式运行,可以运行在本地(Client),也可以运行在集群内部(Cluster),以Spark-shell形式运行的Spark应用程序,其Driver端都是运行在本地,以Spark-submit形式提交的作业可以通过参数- -deploy-mode来指定Driver端运行在哪里。
* Executor: Executor是代码执行的地方,以进程的形式运行在节点之上(如standalone模式的Worker和yarn模式的NM)。
* Task: 一个Executor可以同时运行多个task,每个task执行一个任务,其性质是以多线程的形式运行在Executor中
* Cluster Manager: 根据运行模式的不同(如Standalone,Apache Mesos,Hadoop YARN ,Kubernetes )其性质也会不同,主要是负责获取集群资源的外部服务(如standalone模式的Master和yarn模式的RM)
* Job: 一个Action的触发即为一个job,一个job 存在多个task
* Stage: Job中如果有Shuffle产生就会分为2个Stage,一个Job 可由多个Stage构成,一个Stage 可由多个task 组成
Spark-shell
- Spark-shell是以一种交互式命令行的模式来启动一个Spark作业,它会给你创建好Spark应用程序的入口SparkContext,简称sc,SparkSql 的入口SparkSession(spark2.0)简称spark, 并且以参数的形式来指定运行模式,应用程序名称等(SparkConf),上面说过,以Spark-shell的方式运行Spark应用程序,其Driver端必然是运行在本地,可以通过本地写代码的方式运行在不同的模式上,因此很适合测试和学习。
Spark-shell主要参数:
- -master: 指定运行模式,spark://host:port, mesos://host:port, yarn, or local[n]. - -deploy-mode: 指定将driver端运行在client 还是在cluster. - -class: 指定运行程序main方法类名,一般是应用程序的包名+类名 - -name: 运用程序名称 - -jars: 需要在driver端和executor端运行的jar,如mysql驱动包 - -packages: maven管理的项目坐标GAV,多个以逗号分隔 - -conf: 以key=value的形式传入sparkconf参数,所传入的参数必须是以spark.开头 - -properties-file: 指定新的conf文件,默认使用spark-default.conf - -driver-memory:指定driver端运行内存,默认1G - -driver-cores:指定driver端cpu数量,默认1,仅在Standalone和Yarn的cluster模式下 - -executor-memory:指定executor端的内存,默认1G - -total-executor-cores:所有executor使用的cores - -executor-cores: 每个executor使用的cores - -driver-class-path: driver端的classpath - -executor-class-path:executor端的classpath
Spark-submit
- Spark-submit使用参数和Spark-shell一致,Spark-submit主要是用来提交打包好的Spark作业,Spark-shell底层调用的是Spark-submit。
Spark常用模式
- Spark常用的三种作业提交模式为:Local,Standalone,Yarn,Local为单节点模式,Standalone和Yarn和集群模式,具体的使用方式参照此篇博客
基于Spark Core API 编程案例
使用Spark Core实现Word Count词频统计,思想:拆分,转tuple,按key累加。
package com.venus.spark import org.apache.spark.{SparkConf, SparkContext} object wordCountApp { def main(args: Array[String]): Unit = { val conf =new SparkConf().setMaster("local[2]").setAppName("wc") val sc= new SparkContext(conf) val RDD1= sc.parallelize(Array( "zhangshan lisi wangwu", "hadoop hive hbase", "hello hello" )) val Rdd2=RDD1.flatMap(x => x.split(" ")).map(x=>(x,1)).groupByKey().map(x=>(x._1,function(x._2))) Rdd2.collect().foreach(println) /* Rdd2.saveAsTextFile(args(1))*/ sc.stop() } def function(it:Iterable[Int]) ={ val a=it.sum a } }
读取Sequence File 文件,对于Sequence File的存储格式是一种KV数据格式,Key和Value都是二进制变长的数据,key为空使用value 存放实际的值, 这样是为了避免MR 在执行map 阶段的排序过程。 思想:Sequence File文件格式,其key 是不存放数据的,只需要把value拿出来,然后拆分即可。
package com.venus.spark import org.apache.hadoop.io.BytesWritable import org.apache.spark.{SparkConf, SparkContext} object readSequenceApp { def main(args: Array[String]): Unit = { val sc=new SparkContext(new SparkConf()) val seqRDD=sc.sequenceFile[BytesWritable,String](args(0)) seqRDD.map(x=>x._2.split(" ")).map(x=>(x(0),x(1))).collect().foreach(println(_)) } } 如果想打印出key ,需要将key转换 seqRDD.map(x=>(x._1.copyBytes(),x._2)).collect().foreach(println)
Spark Core实现First Value和Last Value,思想:从迭代器中拿出相应的值
First Value
package com.venus.spark import org.apache.spark.{SparkConf, SparkContext} object getFirstValueApp { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local[2]").setAppName("firstValue") val sc = new SparkContext(conf) val RDD1=sc.parallelize(List( ("A","A1"), ("A","A3"), ("A","A2"), ("B","B2"), ("B","B1"), ("B","B3"), ("C","C3"), ("C","C1") )) RDD1.groupByKey().map(x=>(x._1,firtValue(x._2))).collect().foreach(println) } def firtValue(a: Iterable[String]) ={ a.head } }
Last Value
import org.apache.spark.{SparkConf, SparkContext} object LastValue { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("LastValueApp").setMaster("local[2]") val sc = new SparkContext(conf) val data = sc.parallelize(List( ("A", "A1"), ("A", "A2"), ("A", "A3"), ("B", "B1"), ("B", "B2"), ("B", "B3"), ("C", "C1") )) data.groupByKey().sortByKey() .map(x=>(x._1,lastValue(x._2))) .collect().foreach(println) def lastValue(values: Iterable[String]) = { for(value <- values) yield (value, values.last) } sc.stop() }
Spark Core 实现二次排序,思想:实现Ordered 接口
package com.venus.spark
import org.apache.spark.{SparkConf, SparkContext}
object secondSortApp {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("secondSortApp")
val sc = new SparkContext(conf)
val RDD1=sc.parallelize(List(
(12,3),
(12,4),
(12,1),
(16,3),
(18,12),
(18,11),
(6,3)
x))
RDD1.map(x=>(new secondSort(x._1,x._2),x)).sortByKey().map(x=>x._2).collect().foreach(println(_))
sc.stop()
}
}
package com.venus.spark
class secondSort(val first:Int,val last:Int) extends Ordered[secondSort] with Serializable{
override def compare(that: secondSort): Int = {
if(this.first!=that.first) {
this.first-that.first
}else{
this.last-that.last
}
}
}