目录
一、RDD中的函数传递(序列化问题)
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
1、传递一个方法
package com.xin
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
class SearchTwo(query: String) extends java.io.Serializable{
// 必须加extends //Serializable 因为类初始化在Driver端
// 而实际在executor端需要调用类中的方法
//因此涉及到了网络通信需要序列化
def isMatch(s:String):Boolean={
s.contains(query)
}
def getMatchRdd(rdd:RDD[String])={
rdd.filter(isMatch) //isMatch相当于x => x.contains(query)
}
}
object serializeable {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("serializeable")
val sc = new SparkContext(conf)
//2.创建一个RDD
val rdd:RDD[String] = sc.parallelize(Array("hadoop", "spark","hive"))
//3.创建一个Search对象
val search = new SearchTwo("h")
//4.运用第一个过滤函数并打印结果
search.getMatchRdd(rdd).collect().foreach(println)
}
}
在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。
2、传递一个属性
package com.xin
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//class SearchTwo(query: String) extends java.io.Serializable{
class SearchTwo(query: String) {
def isMatch(s:String):Boolean={
s.contains(query)
}
def getMatchlRdd2(rdd:RDD[String])={
val query_ = this.query
rdd.filter(x => x.contains(query_))
//必须赋值给局部变量,x.contains(this.query)不管用
}
}
object serializeable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("serializeable")
val sc = new SparkContext(conf)
val rdd:RDD[String] = sc.parallelize(Array("hadoop", "spark","hive"))
val search = new SearchTwo("h")
search.getMatchlRdd2(rdd).collect().foreach(println)
}
}
个人总结:调用类中定义的属性,需要实例化,因为类是在Driver中实例化,而调用是在Executer。也可以声明一个局部变量来传递类中的属性,即可以不实例化
二、RDD依赖关系
1、Lineage(容错机制)
RDD只支持粗粒度转换(写是粗颗粒,批处理,读是细颗粒,一行一行),即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。(chekpoint)
实践代码:
object Lineage {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("myspark2").setMaster("local[*]"))
val wordAndOne = sc.textFile("in2/test.txt").flatMap(_.split(",")).map((_,1))
val wordAndCount = wordAndOne.reduceByKey(_+_)
val debugString: String = wordAndOne.toDebugString //查看“wordAndOne”的Lineage
val toDebugString: String = wordAndCount.toDebugString
val dependencies: Seq[Dependency[_]] = wordAndOne.dependencies
val dependencies1: Seq[Dependency[_]] = wordAndCount.dependencies
println("----------------wordAndOne toDebugString------------------")
println(debugString) //HadoopRDD[0] at textFile---MapPartitionsRDD[1] at textFile--flatMap--map
println("----------------wordAndCount toDebugString------------------")
println(toDebugString) //HadoopRDD[0] at textFile---MapPartitionsRDD[1] at textFile--flatMap--map--ShuffledRDD[4] at reduceByKey
println("----------------wordAndOne dependencies------------------")
println(dependencies.toString()) //org.apache.spark.OneToOneDependency@79e66b2f
println("----------------wordAndCount dependencies------------------")
println(dependencies1.toBuffer) //org.apache.spark.ShuffleDependency@17273273
}
}
注意:RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
2、窄依赖
窄依赖(narrow dependency)指每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖形象的比喻为独生子女
3、宽依赖
宽依赖(wide dependency)指多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle
总结:宽依赖我们形象的比喻为超生
总结:宽窄依赖区别在于父RDD的分区被一个还是多个子RDD分区继承,多个RDD继承即产生了shuffle
4、DAG
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
5、任务划分(面试重点)
RDD任务切分中间分为:Application、Job、Stage和Task
1)Application:初始化一个SparkContext即生成一个Application
2)Job:一个Action算子就会生成一个Job
3)Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
4)Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。
注意:Application->Job->Stage-> Task每一层都是1对n的关系。
6、RDD缓存
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。 但并不是被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制(lineage)保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
val mapRDD: RDD[String] = LineRDD.map(_+System.currentTimeMillis())
val cacheRDD: RDD[String] = LineRDD.map(_+System.currentTimeMillis()).cache() //缓存
for( a <- 1 to 5){
mapRDD.foreach(println) //五次输出时间戳不同
}
for( a <- 1 to 5){
if(a==1)println("------------------")
cacheRDD.foreach(println) // 五次输出结果一致
}
7、RDD CheckPoint
检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长(操作记录)会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。设置检查点操作,会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
案例实操:
(1)设置检查点
scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")
(2)创建一个RDD
scala> val rdd = sc.parallelize(Array("atguigu"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24
(3)将RDD转换为携带当前时间戳并做checkpoint
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26
scala> ch.checkpoint
(4)多次打印结果
scala> ch.collect
res55: Array[String] = Array(atguigu1538981860336)
scala> ch.collect
res56: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res57: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res58: Array[String] = Array(atguigu1538981860504)
三、键值对RDD数据分区器
Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
注意:(1)只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于哪个分区的。
1、获取分区
(1)创建一个pairRDD
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)查看RDD的分区器
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None
(3)导入HashPartitioner类
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
(4)使用HashPartitioner对RDD进行重新分区
scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27
(5)查看重新分区后RDD的分区器
scala> partitioned.partitioner
res2: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
2、Hash分区
HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
3、Ranger分区
HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。实现过程为:
第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的
4、自定义分区
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类
class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
//覆盖分区数
override def numPartitions: Int = numParts
//覆盖分区号获取函数
override def getPartition(key: Any): Int = {
val ckey: String = key.toString
ckey.substring(ckey.length-1).toInt%numParts
}
}
将RDD使用自定义的分区类进行重新分区
val par = data.partitionBy(new CustomerPartitioner(2))
四、数据读取与保存
1、MySQL数据库连接
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
读取mysql数据,转换成RDD
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
//TODO 读取mysql数据转换成RDD
object MysqlRDD {
def main(args: Array[String]): Unit = {
//2.创建SparkContext
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("JdbcRDD"))
//3.定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hdp-1:3306/sqoop"
val userName = "root"
val passWd = "123456"
//创建JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from tb3 where id >= ? and id <= ?;",
1, //id最小范围 对应第一个问号
5, //id最大范围 对应第二个问号
1, //分区数
r => (r.getInt(1), r.getString(2)) //结果集处理
)
//打印最后结果
println(rdd.count())
rdd.foreach(println)
sc.stop()
}
}
把RDD数据写入Mysql
//TODO 把RDD写入Mysql:
object writeMysql{
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("MysqlApp"))
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hdp-1:3306/sqoop", "root", "123456")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into tb3(username) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
}
2、HBase数据库
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
object ReadHbase {
def main(args: Array[String]): Unit = {
//创建SparkContext
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HBaseRDD"))
//构建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hdp-1,hdp-2,hdp-3")
conf.set(TableInputFormat.INPUT_TABLE, "stu")
//从HBase读取数据形成RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val count: Long = hbaseRDD.count()
println(count)
//对hbaseRDD进行处理
hbaseRDD.foreach {
case (_, result) =>
val key: String = Bytes.toString(result.getRow)
val name: String = Bytes.toString(result.getValue(Bytes.toBytes("infos"), Bytes.toBytes("name")))
val age: String = Bytes.toString(result.getValue(Bytes.toBytes("infos"), Bytes.toBytes("age")))
println("RowKey:" + key + ",Name:" + name + ",age:" + age)
}
//关闭连接
sc.stop()
}
}
object writeHbase{
def main(args: Array[String]): Unit = {
//获取Spark配置信息并创建与spark的连接
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
//创建HBaseConf
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hdp-1,hdp-2,hdp-3")
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
//构建Hbase表描述器
val fruitTable = TableName.valueOf("fruit_spark")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//创建Hbase表
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)
//定义往Hbase插入数据的方法
def convert(triple: (String, String, String)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
//创建一个RDD 数值型容易出现编码错误,可以改成 "1"
val initialRDD = sc.parallelize(List(("1","apple","11"), ("2","banana","12"), ("3","pear","13")))
//将RDD内容写到HBase
val localData = initialRDD.map(convert)
localData.saveAsHadoopDataset(jobConf)
}
}
五、RDD编程进阶
1、累加器
累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
系统累加器:
package com.xin
import org.apache.spark.{SparkConf, SparkContext}
//TODO 针对一个输入的日志文件,如果我们想计算文件中所有空行的数量,我们可以编写以下程序:
object leijia {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HBaseRDD"))
val notice = sc.textFile("in2/test.txt")
// 创建出存有初始值0的累加器
val blanklines = sc.accumulator(0)
val tmp = notice.flatMap(line => {
if (line == "") {
blanklines += 1 //累加器进行计算
}
line.split(",")
})
val count: Long = tmp.count()
println("单词个数:"+count)
val num: Int = blanklines.value
println("空行个数:"+num) //如果文本最后一行是空值,不进行计算
}
}
通过在驱动器Driver中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。Spark闭包(局部变量)里的执行器代码可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。
注意:工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中,转化操作中累加器可能会发生不止一次更新。
自定义累加器:
继承AccumulatorV2并至少覆写下例中出现的方法,下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以Set[String]的形式返回。
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
override def isZero: Boolean = {
_logArray.isEmpty
}
override def reset(): Unit = {
_logArray.clear()
}
override def add(v: String): Unit = {
_logArray.add(v)
}
override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
other match {
case o: LogAccumulator => _logArray.addAll(o.value)
}
}
override def value: java.util.Set[String] = {
java.util.Collections.unmodifiableSet(_logArray)
}
override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
val newAcc = new LogAccumulator()
_logArray.synchronized{
newAcc._logArray.addAll(_logArray)
}
newAcc
}
}
// 过滤掉带字母的
object LogAccumulator {
def main(args: Array[String]) {
val sc=new SparkContext(new SparkConf().setAppName("LogAccumulator").setMaster("local[3]"))
val accum = new LogAccumulator //new自定义的累加器
sc.register(accum, "logAccum") //通过sc注册累加器
val sum: Int = sc.parallelize(Array("1", "2a", "3", "4b", "5", "xin", "7cd", "8", "9"), 2).filter(line => {
val pattern = """^-?(\d+)""" //^:行开头 (.\d+)?:括号里内出现0或1次 表数字
val flag: Boolean = line.matches(pattern) //匹配是否符合pattern
if (!flag) { //!flag -->包含字母
accum.add(line)
}
flag
}).map(_.toInt).reduce(_ + _)
println("sum: " + sum)
for (v <- accum.value) print(v + " ")
println()
sc.stop()
}
}
2、广播变量(调优策略)
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。 在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
val broadcastVar = sc.broadcast(Array(1, 2, 3))
print(broadcastVar.value.mkString(" "))
使用广播变量的过程如下:
(1) 调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。
(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。