1 原理图
Driver上,有BlockManagerMaster,它负责对各个节点上的BlockManager内部管理的数据的元数据进行维护,比如block的增删改等操作,都会在这里维护元数据等变更。
每个节点上都有BlockManager。BlockManager有几个关键的组件:
DiskStore:负责对磁盘上的数据进行读写。
MemoryStore:负责对内存中的数据进行读写。
// Actual storage of where blocks are kept
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
memoryManager.setMemoryStore(memoryStore)
BlockTransferService:负责对远程其它节点的BlockManager数据进行读写。
shuffleClient:外部服务,或者直接就是BlockTransferService本身,建立BlockManager到远程其它节点的BlockManager的网络连接和数据读写。
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}
每个BlockManager创建之后,第一件事就是向BlockManagerMaster进行注册,此时BlockManagerMaster会为其创建对应的BlockManagerInfo。
BlockManager执行了数据增删改操作,就会将block的BlockStatus上报到BlockManagerMaster上去。在BlockManagerMaster上,会对指定BlockManager的BlockManagerInfo内部的BlockStatus进行增删改操作,从而达到维护元数据的功能。
使用BlockManager进行写操作时,比如rdd运行过程中的一些中间数据,或者手动指定了persist(),则优先将数据写入内存中。如果内存大小不够,则会将内存中的部分数据写入磁盘。如果persist()指定了要replicate,那么会使用BlockTransferService将数据replicate一份到其它节点的BlockManager上去。
从BlockManager读取数据时,比如shuffle read操作,如果能从本地读取数据,那么就利用MemoryStore或DiskStore从本地读取数据。如果本地没有数据的话,那么就会用shuffleClient与有数据的BlockManager建立连接,然后从远程BlockManager读取数据。
2. 代码解析
2.1 BlockManagerMasterEndpoint
对于BlockManagerMaster端,主要的工作类是BlockManagerMasterEndpoint。
2.1.1 类介绍
BlockManagerMasterEndpoint负责维护各个executor的BlockManager的元数据,BlockManagerInfo,BlockStatus
/**
* BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
* of all slaves' block managers.
*/
// 负责维护各个executor的BlockManager的元数据,BlockManagerInfo,BlockStatus
private[spark]
class BlockManagerMasterEndpoint(
override val rpcEnv: RpcEnv,
val isLocal: Boolean,
conf: SparkConf,
listenerBus: LiveListenerBus)
extends ThreadSafeRpcEndpoint with Logging {
2.1.2 blockManagerInfo和blockManagerIdByExecutor
// Mapping from block manager id to the block manager's information.
// 这个map映射了block manager id到block manager的info: BlockManagerId <--> BlockManagerInfo
// BlockManagerMaster负责维护每个BlockManager的BlockManagerInfo
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
// Mapping from executor ID to block manager ID.
// 每个executor是与一个BlockManager相关联的
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
2.1.3 receiveAndReply方法,接收并处理消息。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
...
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
...
2.1.4 BlockManager注册
/**
* Returns the BlockManagerId with topology information populated, if available.
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host)) // 在已有BlockManagerId中增加topology information
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
// 这里其实是做一个安全判断。因为,如果blockManagerInfo map里没有BlockManagerId,
// 那么同步的blockManagerIdByExecutor map里也必须没有,所以这里判断一下,
// 如果blockManagerIdByExecutor map里有BlockManagerId,就做一下清理
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError("Got two different block manager registrations on same executor - "
+ s" will replace old one $oldId with new one $id")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id // 保存一份executorId到BlockManagerId的映射
// 为blockManagerId创建一份BlockManagerInfo
// 并保存一份BlockManagerId到BlockManagerInfo的映射
blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}
2.1.5 updateBlockInfo更新BlockInfo
// 更新BlockInfo
// 也就是说,每个BlockManager上,如果block发生了变化,那么都要发送updateBlockInfo到BlockManagerMaster更新BlockInfo
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return true
} else {
return false
}
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
// 更新block信息
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
// 每一个block可能会存在在多个blockManager上面
// 因为如果将StorageLevel设置成带_2这种形式,那么就会将block replicate拷贝一份放到其它BlockManager上
// blockLocations保存了每个blockId对应的BlockManagerId的set集合
// 所以,这里会更新blockLocations中的信息,因为是用set存储,随意会自动去重
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
// Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
}
2.1.6 删除操作代码 略。。。
2.2 BlockManager
2.2.1 类介绍
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
*
* Note that [[initialize()]] must be called before the BlockManager is usable.
*/
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {
2.2.2 初始化
/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
* where it is only learned after registration with the TaskScheduler).
*
* This method initializes the BlockTransferService and ShuffleClient, registers with the
* BlockManagerMaster, starts the BlockManagerWorker endpoint, and registers with a local shuffle
* service if configured.
*/
def initialize(appId: String): Unit = {
blockTransferService.init(this) // 首先初始化,用于进行远程block数据传输的BlockTransferService
shuffleClient.init(appId)
blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}
// 为当前的BlockManager创建一个唯一的BlockManagerId
// 从这里可以看出,一个BlockManager是通过一个节点上的Executor来唯一标识的
val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
// 使用BlockManagerMasterEndpoint的引用,进行BLockManager的注册,发送消息到BlockManagerMasterEndpoint
val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
logInfo(s"Initialized BlockManager: $blockManagerId")
}
2.2.3 其它要注意的代码
读本地、读远程、写本地、写远程 略。。。