第二天 – Spark集群启动流程 – 任务提交流程 – RDD依赖关系 – RDD缓存 – 两个案例
文章目录
一、Spark集群启动流程
- 调用start-all.sh脚本,启动Master服务,首先执行preStart,检查超时的Worker
- 开始执行receive方法,不断接受其它Actor向它发送过来的请求
- 在调用start-all.sh脚本的时候,会解析slaves配置文件,决定了在哪几个节点上启动Worker服务,Worker服务在启动的时候,会启动preStart方法,该方法会向Master进行注册
- Master收到Worker的注册信息后,开始持久化注册信息,并响应给Worker
- Worker收到Master发送过来的响应信息(MasterUrl),
- Worker开始向Master发送心跳信息
二、Spark任务提交流程:
- Driver端向Master端注册任务
- Master收到Driver端发送过来的信息后,把信息封装为真正的任务信息并把任务信息进行保存
- Master通知Worker拿取任务信息并启动Executor
- Worker向Master拉取任务信息的同时启动Executor
- Executor开始向Driver进行注册
- Driver开始把任务发送给相应的Executor
三、RDD的依赖关系
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一、多对一)
宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition(一对多、多对多)
Lineage
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
四、RDD的缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存多个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
RDD缓存方式、级别
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别分为很多种,存储级别在object StorageLevel中定义的。
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
class StorageLevel的主构造器参数如下:
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
五、案例一:基站信号范围
需求:求用户在一定的时间范围内停留的时间最长的top2的基站范围
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:求用户在一定的时间范围内停留的时间最长的top2的基站范围
* 思路:
* 1.求出在相同基站停留的总时长
* 2.把基站的经纬度join过来
* 3.按用户分组,组内取top2
*/
object MobileLocation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 获取用户日志数据文件
val files: RDD[String] = sc.textFile("F:/scaladata/data/lacduration/log")
// 将用户日志数据进行划分
val splitedLogs: RDD[((String, String), Long)] = files.map(line => {
val fields = line.split(",")
val phone = fields(0) // 手机号
val time = fields(1).toLong // 时间戳
val lac = fields(2) // 基站id
val eventType = fields(3).toInt // 事件类型
val time_long = if (eventType == 1) -time else time
((phone,lac), time_long)
}).cache()
// 用户在相同的基站停留的总时长
val totalTimeLogs: RDD[((String, String), Long)] = splitedLogs.reduceByKey(_+_)
// 为了便于和基站信息进行join,需要把数据进行重新整合
val lacAndPhoneAndTime: RDD[(String, (String, Long))] = totalTimeLogs.map(line => {
val phone = line._1._1
val lac = line._1._2
val time = line._2 // 停留的总时长
(lac, (phone, time))
})
// 获取基站基础信息
val lacInfo = sc.textFile("F:/scaladata/data/lacduration/lac_info.txt")
.map(line => {
val fields = line.split(",")
val lac = fields(0) // 基站id
val x = fields(1) // 经度
val y = fields(2) // 维度
(lac,(x,y))
})
// 把经纬度信息join到用户访问信息中
val joinedLogs: RDD[(String, ((String, Long), (String, String)))] = lacAndPhoneAndTime.join(lacInfo)
// 为了方便将用户进行分组,把数据进行重新整合
val phoneAndTimeAndXY: RDD[(String, Long, (String, String))] = joinedLogs.map(x => {
val phone = x._2._1._1
val lac = x._1
val time = x._2._1._2
val xy = x._2._2
(phone, time, xy)
})
// 按照用户进行分组
val grouped: RDD[(String, Iterable[(String, Long, (String, String))])] = phoneAndTimeAndXY.groupBy(_._1)
// 按照用户访问基站的总时长进行降序排列
val res: RDD[(String, List[(String, Long, (String, String))])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))
res.foreach(f => println(f))
sc.stop()
}
}
两个案例的数据文件下载:点击下载
六、案例二:学科模块网站访问排名
需求:求每个学科各个模块访问量后取topN
普通实现
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:求每个学科各个模块访问量后取topN
* 思路:
* 1.每个学科各个模块的访问量
* 2.以学科进行分组并在组内排序取topN
*/
object SubjectCount_1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 获取数据
val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
val fields = line.split("\t")
val url = fields(1) // 用户请求的url
(url,1)
})
// 按照url进行聚合,得到每个学科各个模块的访问量
val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
// 获取学科信息并返回所有数据
val subjectAndUrlAndCount: RDD[(String, String, Int)] = sumedLogInfo.map(tup => {
val url = tup._1
val count = tup._2
val subject = new URL(url).getHost
(subject, url, count)
})
// 按照学科信息进行分组
val groupedLogInfo: RDD[(String, Iterable[(String, String, Int)])] = subjectAndUrlAndCount.groupBy(_._1)
// 在学科信息组内进行降序排序并取top3
val res: RDD[(String, List[(String, String, Int)])] = groupedLogInfo.mapValues(_.toList.sortBy(_._3).reverse.take(3))
res.foreach(f => println(f))
sc.stop()
}
}
使用缓存
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:求每个学科各个模块访问量后取topN
* 缓存应用
*/
object SubjectCount_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 模拟从mysql中获取的学科信息
val subjects = Array("http://java.learn.com", "http://ui.learn.com", "http://bigdata.learn.com", "http://android.learn.com", "http://h5.learn.com")
// 获取数据
val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
val fields = line.split("\t")
val url = fields(1) // 用户请求的url
(url,1)
})
// 按照url进行聚合,得到每个学科各个模块的访问量
val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
for(subject <- subjects){
// 过滤出属于该学科的各个模块对应的访问量
val filteredSubjectInfo: RDD[(String, Int)] = sumedLogInfo.filter(_._1.startsWith(subject))
// 开始降序排序并取top3
val res: Array[(String, Int)] = filteredSubjectInfo.sortBy(_._2,false).take(3)
res.foreach(f => println(f))
}
sc.stop()
}
}
自定义分区实现
能解决部分数据倾斜问题
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* 需求:求每个学科各个模块访问量后取topN
* 实现自定义分区器,按照学科把不同的学科信息放到不同的分区中
*/
object SubjectCount_3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("mobile_location").setMaster("local[2]")
val sc = new SparkContext(conf)
// 获取数据
val logInfo = sc.textFile("F:/scaladata/data/subjectaccess").map(line => {
val fields = line.split("\t")
val url = fields(1) // 用户请求的url
(url,1)
})
// 按照url进行聚合,得到每个学科各个模块的访问量
val sumedLogInfo: RDD[(String, Int)] = logInfo.reduceByKey(_+_)
// 获取学科信息并返回所有数据
val subjectAndUrlAndCount: RDD[(String, (String, Int))] = sumedLogInfo.map(tup => {
val url = tup._1
val count = tup._2
val subject = new URL(url).getHost
(subject, (url, count))
}).cache()
// 调用默认的分区器进行分区,哈希碰撞导致会出现数据倾斜问题,此时需要自定义分区
// val partitioned = subjectAndUrlAndCount.partitionBy(new HashPartitioner(3))
// partitioned.saveAsTextFile("f:/sparkdata/out-20181120-1")
// 获取所有的学科信息,需要去重
val subjects: Array[String] = subjectAndUrlAndCount.keys.distinct.collect
// 调用自定义分区器
val partitioner = new SubjectPartitioner(subjects)
val partitioned = subjectAndUrlAndCount.partitionBy(partitioner)
val res: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
it.toList.sortBy(_._2._2).reverse.take(3).iterator
})
res.saveAsTextFile("f:/sparkdata/out-20181120-2")
sc.stop()
}
}
/**
* 自定义分区器
*/
class SubjectPartitioner(subjects:Array[String]) extends Partitioner{
// 用于存储学科信息和对应的分区号
val subjectAndNum = new mutable.HashMap[String,Int]()
// 计数器,用于指定分区号
var i = 0
for(subject <- subjects){
subjectAndNum += (subject -> i)
i += 1
}
// 获取分区数
override def numPartitions: Int = subjects.size
// 获取分区号
override def getPartition(key: Any): Int = subjectAndNum.getOrElse(key.toString, 0)
}