package com.skey.litespark.common
// 用于worker向master发送注册的消息
case class RegisterWorkerInfo(id: String, cores: Int, mem: Int)
// 用于master向worker发送注册成功的消息
case object RegisteredWorkerInfo
// 用于worker向master发送心跳
case class HeartBeat(id: String)
// 用于worker内部触发发送心跳的动作
case object SendHeartBeat
// 用于master内部启动worker心跳检测
case object StartTimeOutWorker
// 用于master内部移除超时的worker
case object RemoveTimeOutWorker
// master内部维护的worker信息
class WorkerInfo(val id: String, cores: Int, mem: Int) {
var lastHeartBeat: Long = System.currentTimeMillis()
}
Master
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.skey.litespark.common._
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
import scala.concurrent.duration._
/**
* Master
*
* @author ALion
* @version 2019/4/21 11:43
*/
class LiteMaster extends Actor {
/**
* 存储worker的信息
*/
val workerMap = mutable.HashMap[String, WorkerInfo]()
private val MAX_DELAY = 60 * 1000
/**
* 收到消息后会调用receive,然后匹配对应的消息,做出不同的响应
* @return
*/
override def receive: Receive = {
case "start" =>
// 启动
println("Master 启动成功!")
self ! StartTimeOutWorker
case StartTimeOutWorker =>
// 定时检测worker心跳
println("启动定时检测Worker心跳任务")
import context.dispatcher
context.system.scheduler.schedule(0 millis, 30000 millis, self, RemoveTimeOutWorker)
case RemoveTimeOutWorker =>
// 移除心跳超时的worker
val nowTime = System.currentTimeMillis()
workerMap.values
.filter(nowTime - _.lastHeartBeat > MAX_DELAY)
.foreach { worker =>
workerMap.remove(worker.id)
println(s"移除了worker(id=${worker.id}), 因为心跳超时!")
}
case RegisterWorkerInfo(id, cores, mem) =>
// 注册worker到master
if (!workerMap.contains(id)) {
workerMap.put(id, new WorkerInfo(id, cores, mem))
// 注册成功后,向发送消息的worker返回消息
sender() ! RegisteredWorkerInfo
println(s"worker(id=$id) 注册成功!")
}
case HeartBeat(id) =>
// 收到worker的心跳后,更新本地维护的worker心跳时间
workerMap.get(id) match {
case Some(workerInfo) =>
workerInfo.lastHeartBeat = System.currentTimeMillis()
println(s"更新 worker(id=$id) 心跳时间=${workerInfo.lastHeartBeat}")
case None => println(s"worker(id=$id) 不存在!")
}
}
}
object LiteMaster {
def main(args: Array[String]): Unit = {
// 加载master配置
val config = ConfigFactory.load("master.conf")
// 创建Actor系统
val actorSystem = ActorSystem("Master", config)
// 注册LiteMaster,并获取master引用
val masterRef: ActorRef = actorSystem.actorOf(Props[LiteMaster], "LiteMaster")
// 向master自己发送一个启动消息
masterRef ! "start"
}
}