Spark整理(1)
一,介绍
1.1 什么是spark
-
Apache Spark 是为大规模数据处理设计的快速通用的计算引擎(框架).
-
从右侧的新闻中看,Spark也用于AI人工智能
Spark 是 UC Berkeley AMP lab (加州大学伯克利分校的 AMP 实验室)所开源的类 Hadoop MapReduce 的通用并行计算框架,Spark 拥有Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。Spark 是 Scala 编写,方便快速编程。
1.2 需要Spark的原因?
中间结果输出:基于MapReduce的计算框架通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果。
Spark是MapReduce的替换方案,而且兼容HDFS,Hive,可融入Hadoop的生态系统中,以弥补MapReduce的不足。
1.3 Spark和MapReduce的区别
- MapReduce
处理一些迭代运算,要对HDFS进行频繁的读写,效率较低。
- Spark
都是分布式计算框架,Spark基于内存,MR基于HDFS。Spark处理数据的能力一般是MR的10倍以上,Spark中除了基于内存计算外,还有DAG有向无环图来切分任务的执行先后顺序。
1.4 Spark的API
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
1.5 Spark运行模式
- Local
多用于本地测试,如在eclipse,idea中写程序测试等。
- Standalone
Standalone是Spark自带的一个资源调度框架,它支持完全分布式 ( master, worker )
- Yarn
Hadoop生态圈中的一个资源调度框架,Spark也可以基于Yarn来计算(基于Yarn来进行资源调度,必须实现ApplicationMaster 接口,Spark实现了这个接口)
- Mesos
类似于Yarn的一个资源调度框架
1.6 Spark的组成
Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法,机器,人之间展现大数据应用的一个平台。也是处理大数据,云计算,通信的技术解决方案。
它主要组件有:
-
SparkCore 将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度,RPC,序列化和压缩,并给运行在其上的上层组件提供API。
-
SparkSQL Spark Sql是Spark来操作结构化数据的程序包,可以让我们使用SQL的方式来查询数据。Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。
-
SparkStreaming Spark提供的实时数据进行流式计算的组件
-
MLlib 提供常用机器学习算法的实现库
-
GraphX 提供一个分布式图计算框架,能高效进行图计算
-
BlikDB 用于在海量数据上进行交互式SQL的近似查询引擎
-
Tackyon 以内存为中心高容错的分布式文件系统
二,Spark Core
2.1 RDD
概念:RDD(Resilient Distributed Dateset) 弹性分布式数据集
RDD的五大特性:
- RDD是由一系列的partition组成的
- 函数是作用在partition(spilt)上的
- RDD之间有一系列的依赖关系
- 分区器是作用在K,V格式的RDD上
- RDD提供了一系列最佳的计算位置
单词统计RDD理解图:
注意:
-
textFIle方法底层封装的是MR读取文件的方式,读取文件之前先spilt,默认spilt大小是block的大小
-
RDD实际上不存储数据
-
什么是K,V形式的RDD?
-----如果RDD里面存储的数据都是二元组对象,那么这个RDD我们称为K,V格式的RDD.
- 哪里体现RDD的弹性(容错)?
---- partition数量,大小没有限制,体现了RDD的弹性
---- RDD之间存在依赖关系,可以基于上一个RDD重新计算出RDD
- 哪里体现了RDD的分布式?
---- 一系列partition组成了RDD,partition分布在不同的节点上
---- RDD提供计算的最佳位置,体现了数据本地化。体现了大数据中的计算向数据靠拢的理念
2.2 Lineage(血统)
2.3 Spark任务执行原理
以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
- Driver和集群节点之间有频繁的通信
- Driver负责任务(tasks)的分发和结果的回收,任务的调度。如果task的计算结果非常大就不要回收了,会造成OOM
- Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程 (Yarn中的NodeManager)
- Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程 (Yarn中的ResourceManager)
2.4 Spark代码流程
- 创建SparkConf对象(可设置Application name 和运行模式 及资源需求)
- 创建SparkContext对象(传入sparkConf)
- 基于Spark的上下文创建一个RDD,对RDD进行处理
- 应用程序中要有Action类算子触发Transformation算子执行
- 关闭Spark上下文对象SparkContext
2.5 Transformations(转换算子)
概念:转换算子延迟执行,也叫做懒加载。
-
数据源 (wc.txt)
wcb wa cool hehe heihei l love you xixi heihei haha
-
代码
package com.shsxt.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sp_Transformation {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
//设置运行模式和Application名称
sparkConf.setMaster("local").setAppName("test")
//构建上下文对象
val context = new SparkContext(sparkConf)
val lines: RDD[String] = context.textFile("./wc.txt")
//过滤符合条件的记录数,true保存,false舍弃
val word: RDD[String] = lines.filter(x=>{
x.contains("l")
})
// word.foreach(println) //输出结果 wcb wa cool l love you
//将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素
//特点:输入一条,输出一条
val data: RDD[(String, Int)] = lines.map(x=>(x,1))
// data.foreach(println) //输出结果 (wcb wa cool,1)
// (hehe heihei,1)
// (l love you,1)
// (xixi heihei haha,1)
//先map后flat。和map类似,每个输入项可以映射成0到多个输出项
//特点:输入一条,输出多条
val str: RDD[String] = lines.flatMap(x=>x.split(" "))
//str.foreach(println) //输出结果 wcb
// wa
// cool
// hehe
// heihei ......
//随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样
val sample: RDD[String] = lines.sample(true,0.5)
//sample.foreach(println) //每次打印结果不一样
//将相同的key根据相应的逻辑进行处理
//str.map(x=>(x,1)).reduceByKey(_+_).foreach(println) //输出结果 (cool,1)
//sortByKey/sortBy 作用在K,V的RDD上,对key进行升序或者降序排序
str.map(x=>(x,1)).reduceByKey(_+_).sortBy(x=>x._2,false).foreach(println)
}
}
2.6 Action 行动算子
- 概念:
Action 类算 子也 是一 类 算子 (函 数)叫 做 行动 算子 , 如foreach,collect,count 等。Transformations 类算子是延迟执行,Action 类算子是触发执行。一个 application 应用程序中有几个 Action 类算子执行,就有几个 job 运行
-
数据同上
-
代码
package com.shsxt.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Sp_Active {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local").setAppName("test")
val context = new SparkContext(sparkConf)
val lines: RDD[String] = context.textFile("./wc.txt")
//返回数据集中的元素数。会在结果计算完成后回收到 Driver 端。
val count = lines.count()
//println(count) //运行结果为 4
//返回一个包含数据集前 n 个元素的集合。
val array: Array[String] = lines.take(2)
//array.foreach(println)
// first=take(1),返回数据集中的第一个元素。
val str = lines.first()
//println(str)
//foreach 遍历 不再介绍
//collect 将计算结果回收到Driver
val array01: Array[String] = lines.collect()
array01.foreach(println)
}
}
2.7 控制算子
- 概念
控制算子有三种: cache,persist,checkpoint,这些算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的,必须有一个Action算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
- cache
默认将RDD的数据持久化到内存中,cache是懒执行。
cache() = persist(StorageLevel.Memory_only)
测试代码:
package com.shsxt.spark.scala
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Sp_Cache {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local").setAppName("cache")
val context = new SparkContext(sparkConf)
var lines: RDD[String] = context.textFile("./NASA_access_log_Aug95")
//使用cache
lines = lines.cache()
val startTime = System.currentTimeMillis()
//触发算子触发 cache
var n = lines.count()
println(n)
val endTime = System.currentTimeMillis()
println("用时: "+(endTime-startTime) +"个数 :"+n)
val startTime1 = System.currentTimeMillis()
n = lines.count()
val endTime1 = System.currentTimeMillis()
println("用时: "+(endTime1-startTime1) +"个数 :"+n)
context.stop()
}
}
打印结果:
计算效率提高很明显
java代码实现:
package com.shsxt.spark.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
public class Cache {
public static void main(String args[]){
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("cache");
JavaSparkContext context = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = context.textFile("./NASA_access_log_Aug95");
rdd = rdd.cache();
rdd.persist(StorageLevel.DISK_ONLY_2()); //等同cache
long startTime = System.currentTimeMillis();
long l = rdd.count();
long endTime = System.currentTimeMillis();
System.out.println("总数:"+l+" "+"用时:"+(endTime-startTime));
startTime = System.currentTimeMillis();
l = rdd.count();
endTime = System.currentTimeMillis();
System.out.println("总数:"+l+" "+"用时:"+(endTime-startTime));
}
}
- persist
可以指定持久化的级别。最常用的是Memory_only和Memory_And_DIsk. "_2"表示有副本数。
持久化级别列举:
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1) ##可以自定义
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
- cache和persist的注意事项
1.cache和persist都是懒执行,必须有一个行动类算子触发执行
2.cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了,持久化的单位是partition
3.cache和persist算子后不能立即紧跟action算子
例如: rdd.cache().count() 返回的不是持久化的 RDD,而是一个数值了
- checkpoint
checkpont将RDD持久化到磁盘,还可以切换RDD之间的依赖关系。
checkpoint的执行原理:
1.当RDD的job执行完毕后,会从finalRDD从后往前回溯
2.当回溯到某个RDD调用了checkpoint方法,会对当前的RDD做一个标记
3.Spark框架会自动启动新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
- checkpoint优化
RDD执行checkpoint之前,最好对这个RDD进行cache,这样新启动的Job只需要将内存中的数据拷贝到HDFS上即可,省去了重新计算的时间。
- checkpoint代码
package com.shsxt.spark.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
public class CheckPoint {
public static void main(String args[]){
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("checkPoint");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//通过上下文设置 checkpoint保存的路径
sc.setCheckpointDir("./checkPoint");
JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1, 2, 3));
//在调用checkpoint,先执行cache,加快checkpoint的效率
javaRDD.cache();
javaRDD.checkpoint();
javaRDD.count();
sc.stop();
}
}
三,集群搭建
分为Spark自带的资源框架standalone和基于Yarn的两种搭建方式
这里三台节点,分配node01为master,node02,node03为Worker节点
3.1 standalone
1.下载安装包,解压(我这边下载的是spark-1.6,结合hadoop-2.6.x)
下载地址:https://archive.apache.org/dist/spark/spark-1.6.0/
解压命令: tar -zvxf …
2.改名,名字太长麻烦(你随意)
命令:
mv spark-1.6.0-bin-hadoop2.6.tgz spark-1.6.0
3.进入到安装包的conf目录下,修改slaves.template,添加从节点
4.修改spark-env.template
mv spark-env.template.sh spark-env.sh
JAVA_HOME:配置 java_home 路径
SPARK_MASTER_IP:master 的 ip
SPARK_MASTER_PORT:提交任务的端口,默认是 7077
SPARK_WORKER_CORES:每个 worker 从节点能够支配的 core 的个数
SPARK_WORKER_MEMORY:每个 worker 从节点能够支配的内存数
5.同步到其它节点上
[root@node01 home]# scp -r spark-1.6.0/ node02:`pwd`
[root@node01 home]# scp -r spark-1.6.0/ node03:`pwd`
6.启动集群
进入到sbin目录下,执行当前目录下的 ./start-all.sh
8080是Spark WEBUI 界面的端口,7077是Spark任务提交的端口。
Sbin目录下,编辑 start-master.sh 修改相关端口
7,Standalone提交命令
运行spark给的demo案例(模拟圆周率的生成)
bin目录下
./spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
后面详情介绍程序提交的方式
3.2 Yarn
在安装目录conf下,spark-env.template中添加 hadoop的安装目录下的相关配置
其他步骤和Standalone一样。
- Yarn提交程序命令
关闭,standalone运行模式
启动HDFS和Yarn
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100