一起养成写作习惯!这是我参与「掘金日新计划 · 4 月更文挑战」的第2天,点击查看活动详情。
1拜师学艺--初探Spark Standalone架构
Spark Standalone, 主从架构:
- Master 主节点:管理worker
- worker 从节点:提供资源,接受任务并且执行
模拟实现的需求:
- Master的实现,worker的实现
- Master启动和worker启动
- Master和worker 心跳机制实现(以及验活机制的实现)
下图是Spark Standalone的时序图:
如图所示: 首先要启动 SparkMaster ;master节点要启动一个验活超时的检查服务,来检查该master下面的从节点worker的在线情况,每隔3s检查一次:判断当前的系统时间 - 上一次的心跳时间是否大于了超时阈值,如果已经超出则进行相关节点的下线处理;处理worker发送来的心跳消息(更新该从节点的最近一次心跳时间)。
启动sparkWorker;启动之后向master注册,然后等待master返回注册的信息,如果注册成功就准备通过心跳向master进行资源的汇报(定期发送心跳)。
需要注意的几个时间:
-
心跳间隔时间:3s
-
从节点宕机死亡的超时时间:currentTime - lastHeartbeat > NMtimeout = 20
-
定时服务的间隔时间:5s
2苦练内功--了解Akka模型
2.1概念
Akka 是一个用 Scala 编写的库,用于在 JVM 平台上简化编写具有可容错的、高可伸缩性的 Java 和Scala 的Actor 模型应用,其同时提供了Java 和 Scala 的开发接口。Akka 允许我们专注于满足业务需求,而不是编写初级代码。在 Akka 中,Actor 之间通信的唯一机制就是消息传递。Akka 对 Actor 模型的使用提供了一个抽象级别,使得编写正确的并发、并行和分布式系统更加容易。
Netty对于java相当于Alla对于Scala的地位!
Spark 2.x版本以前的RPC实现是基于Scala的网络编程模型:Akka
Akka的特点:
它对于并发模型进行了更高的抽象;
2.它是异步、非阻塞、高性能的事件驱动编程模型
3.它是轻量级时间处理(1GB内存可以容纳百万级别分Actor)
三个比较重要的概念:
-
ActorSystem:国家 一个总的概念
-
Actor:通信代表 代表自己的国家
-
ActorRef:通信代理人,大使,驻扎在自己国家内的国外大使,负责本国和其它国家的交流
1、ActorSystem 管理通信角色 actor 的一个系统概念,在一个服务器节点中,只要存在一个这样的对象就可以,这个对象的作用,就是用来生成和管理所有的通信角色的生命周期
2、通信角色 Actor,存在于一台服务器中的一个 ActorSystem 的内部,用来和其他节点的 actor 进行 通信。每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor
5、每一个 ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从 ActorSystem 中,获取一个 Actor,则通过以下的方式来进行 Actor 的获取:
akka.tcp://asname@bigdata02:9527/user/actorname
akka.tcp:// 为协议 ;asname为ActorSystem 名字;bigdata02:9527为服务器及端口号;user为守卫Actor;actorname 顶级Actor(或者是顶级Actor的子Actor)
Actor层次结构:
6、如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
7、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。
Akka 工作原理图:
上图的核心要点:
- ActorSystem负责Actor的生命周期
- Actor可以是ActorSystem产生的也可以是Actor产生的
- Actor是具体负责通信的组件
- Actor中有一个MailBox,负责存储其他Actor发送过来的消息,实现异步通信
- 如果当前的Actor想要和其他的Actor进行通信,则必须获取对方Actor的ActorRef对象,通过ActorRef来调用远程的Actor进行服务
2.2代码详解
2.2.1创建Actor
Actor有3中创建的方式:
- 使用默认的构造函数创建Actor
- 使用非默认的构造函数创建Actor
- 通过隐式变量context创建Actor
- 使用默认的构造函数创建Actor
//创建ActorSystem,创建ActorSystem是创建和查找Actor的入口
val actorSystem = ActorSystem("StringSystem",conf)
//使用默认的构造函数创建Actor实例
val stringActor = actorSystem.actorOf(Props[StringActo], Constant.MSA)
//给stringActor发送消息
stringActor!"hello"
class StringActor extends Actor{
def receive = {
case s:String ...
case _ ...
}
}
复制代码
代码val actorSystem = ActorSystem("StringSystem",conf)创建的是ActorSytem实例,ActorSystem是创建和查询Actor 的入口,通过ActorSystem创建的对象为顶级Actor
代码actorSystem.actorOf(Props[StringActo], Constant.MSA)中的actorOf方法返回的是ActorRef的对象实例并赋值给变量stringActor通过stringActor便可以与创建的Actor进行交互,它是一种不可变的对象 且与创建的Actor具有一一对应的关系,在实际使用时可以序列化并可以网络传输供远程调用,使用actorOf创建Actor时还可以指定创建的Actor名称,该名称用于标识创建的Actor, 不能与当前程序中的其他Actor 有命名冲突。
stringActor!"hello" 向stringActor发送字符串消息,该stringActor收到之后调用receuve方法进行匹配处理。
- 使用非默认的构造函数创建Actor
//创建ActorSystem,创建ActorSystem是创建和查找Actor的入口
val actorSystem = ActorSystem("StringSystem",conf)
//使用非默认的构造函数创建Actor实例
val stringActor = actorSystem.actorOf(Props(StringActo("strActor")), Constant.MSA)
//给stringActor发送消息
stringActor!"hello"
class StringActor(var name:String) extends Actor{
def receive = {
case s:String ...
case _ ...
}
}
复制代码
注意此处使用的是Props()而不是Props[]
StringActor定义了自己的主构造函数
- 通过隐式变量context创建Actor
//创建ActorSystem,创建ActorSystem是创建和查找Actor的入口
val actorSystem = ActorSystem("StringSystem",conf)
//使用默认的构造函数创建Actor实例
val contextActor = actorSystem.actorOf(Props[ContextActor],name = "ContextActor")
//给stringActor发送消息
contextActor!"hello"
class ContextActor extends Actor{
//通过context创建
var stringActor = context.actorOf(Props[StringActo], Constant.MSA)
def receive = {
case s:String stringActor!s
case _ ...
}
}
class StringActor(var name:String) extends Actor{
def receive = {
case s:String ...
case _ ...
}
}
复制代码
定义了两个Actor,StringActor通过在ContextActor中使用context.actorOf进行创建,创建的Actor(StringActor)会成为当前Actor(ContextActor)的子Actor。
注意:在创建Actor特别是使用非默认构造函数创建Actor时,不能直接在代码中通过new关键字来创建Actor实例。(不可以显示的创建,只能通过context.actorOf(Props[MyActor])或者system.actorOf(Props(new MyActor(...))))
2.2.2消息传递
Akka有两种消息类型:Fire-And-Forget和Send-And-Receive-Future
Fire-And-Forget
是一种单向消息发送模型,消息发送后可以立即返回,无需等待目标Actor返回结果,Akka中使用!方法进行Fire-And-Forget消息发送,!还有一个重载的方法tell。
!不需要指定sender而tell方法需要显示的指定sender
//隐式为当前的Actor
def !(message:Any)(implicit sender:ActorRef = Actor.noSender):Unit
def tell(message:Any,Sender:ActorRef):Unit = this.!(msg)(sender)
复制代码
Send-And-Receive-Future
是一种双向消息发送模型,向目标Actor发送完消息后,返回一个Future作为后期可能的返回,当前发送方Actor将等待目标Actor的返回,Akka使用?方法进行Send-And-Receive-Future消息的传递,有一个重载的方法ask。
2.2.3其他方法
-
preStart():创建Actor时调用,在接受和处理消息前处理,主要用于Actor的初始化等工作
-
preStop():停止时调用
-
unhandled():有未能处理的消息时调用
2.3Akka与Spark Standalone
Spark集群中Master Worker:Worker现在要发送注册(RegisterWorker)消息给Master:masterActorRef.send(RegisterWorker)
Spark-x版本中的应用程序执行的时候,会生成一个Driver和多个Executor
它的内部就有两个Actor:
DriverActor:负责发送任务给其他的worker中的Executor来执行的,作用和Spark-2.x版本中的DriverEndpoint是一样的
2.ClientActor:负责和master进行通信,作用和Spark-2.x版本中的ClientEndpoint是一样的
下图为运行过程中各种组件的分布图:
重要角色:
- Client 提交应用程序
- Master 负责管理资源
- Worker 真正资源的拥有者
注意一点:
Spark RPC 中EndPoint的作用,类似于Akka中的Actor。实际上就是内部封装了一个Actor
- SchedulerBackend负责通信工作
通信终端(actor = Endpoint)
ClientEndPoint :负责Worker和Master的通信
DriverEndpoint :负责Driver和Executor的通信,更加抽象一点是SchedulerBackend和ExecutorBackend来完成Driver和Executor的通信
- TaskScheduler和DAGScheduler负责应用程序的调度工作
AGScheduler会把Job划分成一个或多个Stage,并把Stage分成一个或多个Task。当DAGScheduler完成Task的创建后,就会把Task按Task集(Task Set)的方式交给TaskScheduler,接下来就由TaskScheduler来接管后续的过程。
TaskScheduler是Task的调度器,它负责把Task(Task Set)提交给Spark集群去运行,总的来说TaskScheduler的功能主要有以下几个方面:
提交分发Task到各个Executor
处理任务完成的状态
3神功出世--Akka模拟代码
代码主要分为四个部分:
-
Constant 用来保存一些常量名
-
Message 辅助类,编码worker和Master直接传递的信息
-
Master 主节点
-
worker 从节点
3.1Constant辅助类实现
//一些信息类
object Constant {
val MSAS = "MasterActorSystem"
val MSA = "MasterActor"
val WAS = "WorkerActorSystem"
val WA = "WorkerActor"
}
复制代码
3.2Message辅助类实现
//注册消息 worker -> master
case class RegisterWORKER(val workerid: String, val memory: Int, val cpu: Int)
//注册完成消息 master -> worker
case class RegisteredWORKER(val masterhostname: String)
//心跳消息 worker -> master
case class Heartbeat(val workerid: String)
//worker 信息类
class WORKERInfo(val workerid: String, val memory: Int, val cpu: Int) {
//上一次心跳时间
var lastHeartBeatTime: Long = _
override def toString: String = {
workerid + "," + memory + "," + cpu
}
}
//一个发送心跳的信号
case object SendMessage
//一个检查信号
case object CheckTimeOut
复制代码
3.3master类实现
必须了解的知识:
-
在scala中,伴生类 class A 和 伴生对象 object A(定义的方法,都是静态方法)
-
关于 scala 中定义的一个类的构造方法:构造器: 类名后面的括号
代码实现:
{} 中的一切能执行的代码
变量的初始化
代码块
静态代码块
不能执行的代码: 定义的方法(未调用, 内部类)
Actor类中主要的三个方法:
- receive 方法 接收其他 actor 发送过来的消息,然后进行模式匹配,进行消息处理,有可能返回消息
- preStart() 方法 对象在构建成功之后,就会触发执行 preStart
- postStop 方法 在对象销毁之前,会执行一次
master启动类:
object Master {
def main(args: Array[String]): Unit = {
//地址参数
val str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = localhost
|akka.remote.netty.tcp.port = 6789
""".stripMargin
val conf = ConfigFactory.parseString(str)
//ActorSystem
val actorSystem = ActorSystem(Constant.MSAS, conf)
//启动了一个actor : Master => 转到Master的构造方法
actorSystem.actorOf(Props(new Master("localhost", 6789)), Constant.MSA)
/**
* 注释: actor 的生命周期
* 1、Master actor 的构造方法
* 2、preStart() 当 actor 实例创建成功的时候,就会马上调用这个 actor 的 preStart() 来执行
*/
}
}
复制代码
master构造类:
class Master(var hostname: String, var port: Int) extends Actor {
//用来存储每个注册的WORKER节点的信息
private var id2WORKERinfo = new mutable.HashMap[String, WORKERInfo]()
//对所有注册的WORKER进行去重,其实就是一个HashSet
private var WORKERInfoes = new mutable.HashSet[WORKERInfo]()
//actor在最开始的时候,会执行一次
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
//调度一个任务, 每隔五秒钟执行一次
context.system.scheduler.schedule(0 millis, 5000 millis, self, CheckTimeOut)
}
//正经服务方法
override def receive: Receive = {
//接收 注册消息
case RegisterWORKER(WORKERid, memory, cpu) => {
val WORKERInfo = new WORKERInfo(WORKERid, memory, cpu)
println(s"节点 ${WORKERid} 上线")
//对注册的WORKER节点进行存储管理
id2WORKERinfo.put(WORKERid, WORKERInfo)
WORKERInfoes += WORKERInfo
//把信息存到zookeeper
//sender() 谁给我发消息,sender方法返回的就是谁
sender() ! RegisteredWORKER(hostname + ":" + port)
}
//接收心跳消息
case Heartbeat(WORKERid) => {
val currentTime = System.currentTimeMillis()
val WORKERInfo = id2WORKERinfo(WORKERid)
WORKERInfo.lastHeartBeatTime = currentTime
id2WORKERinfo(WORKERid) = WORKERInfo
WORKERInfoes += WORKERInfo
}
//检查过期失效的 WORKER
case CheckTimeOut => {
val currentTime = System.currentTimeMillis()
//15 秒钟失效
//过滤出 超时节点
WORKERInfoes.filter(nm => {
val heartbeatTimeout = 15000
val bool = currentTime - nm.lastHeartBeatTime > heartbeatTimeout
if (bool) {
println(s"节点 ${nm.WORKERid} 下线")
}
bool
}).foreach(deadnm => {
WORKERInfoes -= deadnm
id2WORKERinfo.remove(deadnm.WORKERid)
})
println("当前注册成功的节点数" + WORKERInfoes.size + "\t分别是:" + WORKERInfoes.map(x => x.toString)
.mkString(","));
}
}
}
复制代码
3.4worker类实现
worker启动类:
//远程主机名称
val HOSTNAME = args(0)
//master 的 hostname 和 port
val MA_HOSTNAME = args(1)
val MA_PORT = args(2).toInt
//抽象的内存资源 和 CPU 个数
val WORKER_MEMORY = args(3).toInt
val WORKER_CORE = args(4).toInt
//当前 worker 的 hostname 和 port
var WORKER_PORT = args(5).toInt
var WORKERHOSTNAME = args(6)
//指定主机名称和端口号相关的配置
val str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = ${HOSTNAME}
|akka.remote.netty.tcp.port = ${WORKER_PORT}
""".stripMargin
val conf = ConfigFactory.parseString(str)
//启动一个 ActorSystem
val actorSystem = ActorSystem(Constant.WAS, conf)
//启动一个Actor
actorSystem.actorOf(Props(new worker(WORKERHOSTNAME, MS_HOSTNAME, MS_PORT, WORKER_MEMORY, WORKER_CORE)), Constant.WA)
复制代码
worker构造类:
/**
* 1、spark rpc 生命周期方法: onStart receive onStop
* 2、akka rpc 生命周期方法: preStart receive postStop()
*/
class worker(val WORKERHOSTNAME: String, val masterhostname: String, val masterport: Int, val memory: Int,
val cpu: Int) extends Actor {
var WORKERid: String = WORKERHOSTNAME
var msRef: ActorSelection = _
//会提前执行一次
//当前worker启动好了之后,就应该给 master 发送一个注册消息
//发给谁,就需要获取这个谁的一个ref实例
override def preStart(): Unit = {
//获取消息发送对象的一个ref实例
// 远程path akka.tcp://(ActorSystem的名称)@(远程地址的IP):(远程地址的端口)/user/(Actor的名称)
msRef = context.actorSelection(s"akka.tcp://${
Constant.MSAS
}@${masterhostname}:${masterport}/user/${Constant.MSA}")
//发送消息
println(WORKERid + " 正在注册")
msRef ! RegisterWORKER(WORKERid, memory, cpu)
}
//正常服务方法
override def receive: Receive = {
//接收到注册成功的消息
case RegisteredWORKER(masterURL) => {
println(masterURL);
//initialDelay: FiniteDuration, 多久以后开始执行
//interval: FiniteDuration, 每隔多长时间执行一次
//receiver: ActorRef, 给谁发送这个消息
//message: Any 发送的消息是啥
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(0 millis, 4000 millis, self, SendMessage)
}
//发送心跳
case SendMessage => {
//向主节点发送心跳信息
msRef ! Heartbeat(WORKERid)
println(Thread.currentThread().getId)
}
}
}
复制代码