1、JobManager源码分析
JobManager 是 Flink 集群的主节点,它包含三大重要的组件
1、ResourceManager
Flink的集群资源管理器,只有一个,关于slot的管理和申请等工作,都由他负责
2、Dispatcher
负责接收用户提交的 JobGragh, 然后启动一个 JobManager, 类似于 YARN 集群中的
AppMaster 角色,类似于 Spark Job 中的 Driver 角色
3、WebMonitorEndpoint
里面维护了很多很多的Handler,如果客户端通过 flink run 的方式来提交一个 job 到 flink
集群,最终,是由 WebMonitorEndpoint 来接收,并且决定使用哪一个 Handler 来执行处理
submitJob ===> SubmitJobHandler
JobMaster(Dispatcher中创建)
负责一个具体的 Job 的执行,在一个集群中,可能会有多个 JobMaster同时执行,类似于 YARN
集群中的 AppMaster 角色,类似于 Spark Job 中的 Driver 角色
flink job执行总结:
Flink 集群的主节点内部运行着:ResourceManager 和 Dispatcher,当 client 提交一个 job 到
集群运行的时候(客户端会把该 Job 构建成一个 JobGragh 对象),Dispatcher 负责拉起
JobMaster来负责这个 Job 内部的 Task 的执行,执行Task所需要的资源,JobMaster向
ResourceManager 申请。
StandaloneSessionClusterEntrypoint
ClusterEntrypoint.runClusterEntrypoint-> startCluster -> ClusterEntrypoint.runCluster
private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
synchronized(lock) {
/**
* TODO ->
* 注释: 初始化服务,如 JobManager 的 Akka RPC 服务,HA 服务,心跳检查服务,metric service
* 这些服务都是 Master 节点要使用到的一些服务
* 1、commonRpcService: 基于 Akka 的 RpcService 实现。RPC 服务启动 Akka 参与者来接收从 RpcGateway 调用 RPC
* 2、haServices: 提供对高可用性所需的所有服务的访问注册,分布式计数器和领导人选举
* 3、blobServer: 负责侦听传入的请求生成线程来处理这些请求。它还负责创建要存储的目录结构 blob 或临时缓存它们
* 4、heartbeatServices: 提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送者。
* 5、metricRegistry: 跟踪所有已注册的 Metric,它作为连接 MetricGroup 和 MetricReporter
* 6、archivedExecutionGraphStore: 存储执行图ExecutionGraph的可序列化形式。
*/
initializeServices(configuration, pluginManager);
// TODO 注释: 将 jobmanager 地址写入配置
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
/*************************************************
* TODO
* 注释: 初始化一个 DefaultDispatcherResourceManagerComponentFactory 工厂实例
* 内部初始化了四大工厂实例
* 1、DispatcherRunnerFactory = DefaultDispatcherRunnerFactory
* 2、ResourceManagerFactory = StandaloneResourceManagerFactory
* 3、RestEndpointFactory(WenMonitorEndpoint的工厂) = SessionRestEndpointFactory
* 返回值:DefaultDispatcherResourceManagerComponentFactory
* 内部包含了这三个工厂实例,就是三个成员变量
* -
* 再补充一个:dispatcherLeaderProcessFactoryFactory = SessionDispatcherLeaderProcessFactoryFactory
*
* --> StandaloneSessionClusterEntrypoint.createDispatcherResourceManagerComponentFactory
*
*/
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
/*************************************************
* TODO
* 注释:启动关键组件:Dispatcher 和 ResourceManager。
* 1、Dispatcher: 负责用于接收作业提交,持久化它们,生成要执行的作业管理器任务,并在主任务失败时恢复它们。
* 此外, 它知道关于 Flink 会话集群的状态。负责为这个新提交的作业拉起一个新的 JobManager 服务
* 2、ResourceManager: 负责资源的分配和记帐。在整个 Flink 集群中只有一个 ResourceManager,资源相关的内容都由这个服务负责
* registerJobManager(JobMasterId, ResourceID, String, JobID, Time) 负责注册 jobmaster,
* requestSlot(JobMasterId, SlotRequest, Time) 从资源管理器请求一个槽
* 3、WebMonitorEndpoint: 服务于 web 前端 Rest 调用的 Rest 端点,用于接收客户端发送的执行任务的请求
*
* --> DefaultDispatcherResourceManagerComponentFactory.create
*/
clusterComponent = dispatcherResourceManagerComponentFactory
.create(configuration, ioExecutor, commonRpcService, haServices,
blobServer, heartbeatServices, metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
/*************************************************
* TODO
* 注释:集群关闭时的回调
*/
clusterComponent.getShutDownFuture().whenComplete((ApplicationStatus applicationStatus, Throwable throwable) -> {
if(throwable != null) {
shutDownAsync(ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(applicationStatus, null, true);
}
});
}
}
1、 initializeServices
-> commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService -> createAndStart
-> new AkkaRpcService -> supervisor = startSupervisorActor();
SupervisorActor是个actor (class SupervisorActor extends AbstractActor extends Actor)
-> final ActorRef actorRef = SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor);
-> Supervisor.create(actorRef, terminationFutureExecutor) -> new Supervisor()
2、 创建和启动WebMonitorEndpoint,ResourceManager,Dispatcher
--> clusterComponent = dispatcherResourceManagerComponentFactory.create
-> DefaultDispatcherResourceManagerComponentFactory.create(
创建和启动WebMonitorEndpoint,ResourceManager,Dispatcher)
2.1 启动 webMonitorEndpoint
-> webMonitorEndpoint.start() -> RestServerEndpoint.start
-> (2.1.1) 初始化handler(包括: JobSubmitHandler)
handlers = initializeHandlers -> DispatcherRestEndpoint.initializeHandlers
-> super.initializeHandlers ->
/*************************************************
* TODO
* 注释: Handler 容器初始化
*
* ChannelInboundHandler channelRead0() 方法,这个方法会自动被 Netty 去调用执行
* 不管是这里初始化的那个 Handler, 里面都有一个 handleRequest 的方法
* channelRead0() 的底层,最终调用的就是 Hnadler.handleRequest() 方法
* 将来我们提交 Job 的时候,最终,由 WebMonitorEndpoint(DispatcherRestEndpoint) 接收到,
* 跳转到使用 JobSubmitHandler 来执行 最终执行请求的就是 JobSubmitHandler.handleRequest()
* 这些 Handler 的作用,其实就对应到 Flink web 页面的 rest 服务, Handler == Servlet 如 bigdata02:port/list
*/
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
-> new JobSubmitHandler(将来客户端提交应用程序上来,由 JobManager 中的 Netty 服务端的 JobSubmitHandler 来执行处理)
->(2.1.2) 启动 Netty 服务端,添加所有初始化好的handler -> bootstrap = new ServerBootstrap();
-> bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(initializer);
->(2.1.3) 启动 startInternal -> WebMonitorEndpoint.startInternal
选举通用套路
/*************************************************
* TODO
* 注释: 选举 ZooKeeperLeaderElectionService
* this 很重要,哪个对象进行选举操作就是那个对象 这里是WebMonitorEndpoint
* 选举成功后会调用该对象的grantLeaderShip方法
* 不管你在那个地方见到这种格式的代码:leaderElectionService.start(this);
* 一定要记住,最终,
* 1、参与选举的 某个获胜的角色会调用:
* leaderElectionService.isLeader() ==> leaderContender.grantLeaderShip()
* 2、参与选举的 某个失败的角色会调用: leaderElectionService.notLeader()
*/
leaderElectionService.start(this);
-> ZooKeeperLeaderElectionService.start(){
/*************************************************
* TODO
* 注释: Fink 的 选举,和 HBase 一样都是通过 ZooKeeper 的 API 框架 Curator 实现的
* 1、leaderLatch.start(); 事实上就是举行选举
* 2、leaderLatch.addListener 添加了监听器 当选举结束的时候:
* 如果监听到成功了: isLeader()
* 如果监听到失败了: notLeader()
*/
leaderLatch.addListener(this);
leaderLatch.start();
}
-> isLeader(){
/*************************************************
* TODO
* 注释: 分配 LeaderShip
* leaderContender = JobManagerRunnerImpl
* leaderContender = ResourceManager
* leaderContender = DefaultDispatcherRunner
* leaderContender = WebMonitorEndpoint
* 当前是WebMonitorEndpoint,后续 哪个组件进行选举就执行那个组件的grantLeadership方法
*
* leaderElectionService.start(this);
* leaderContender = this
*/
leaderContender.grantLeadership(issuedLeaderSessionID);
}
--> WebMonitorEndpoint.grantLeadership(){
* TODO
* 注释: 确认 Leader
*/
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
}
2.2 创建 StandaloneResourceManager 实例对象和SlotManager
DefaultDispatcherResourceManagerComponentFactory.create()
resourceManager = resourceManagerFactory.createResourceManager -> ResourceManagerFactory.createResourceManager
--> (2.2.1) 创建SlotManager
createResourceManagerRuntimeServices -> ResourceManagerRuntimeServices.fromConfiguration
-> SlotManager slotManager = new SlotManagerImpl
--> (2.2.2) 创建 ResourceManager 实例
createResourceManager -> StandaloneResourceManagerFactory.createResourceManager
-> new StandaloneResourceManager -> super -> ResourceManager -> super -> FencedRpcEndpoint -super
-> RpcEndpoint(){
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
/*************************************************
* TODO
* 注释: 启动 ResourceManager 的 RPCServer 服务
* 这里启动的是 ResourceManager 的 Rpc 服务端。
* 接收 TaskManager启动好了而之后, 进行注册和心跳,来汇报 Taskmanagaer 的资源情况
* 通过动态代理的形式构建了一个Server、
*
* --> AkkaRpcService.startServer
*/
this.rpcServer = rpcService.startServer(this);
/*************************************************
* TODO
* 注释: 线程池的初始化
*/
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
-> rpcService.startServer(this); -> AkkaRpcService.startServer(){
/*************************************************
* TODO 通过代理的方式来获取一个 ActorRef 对象
* 注释: RpcServer 实现类
* JDK 提供的动态代理: Proxy InvocationHandler
*/
akkaInvocationHandler = new AkkaInvocationHandler(akkaAddress, hostname, actorRef, configuration.getTimeout(),
configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks);
// 获取 RpcServer 对象, 启动 RpcServer
RpcServer server = (RpcServer) Proxy
.newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class<?>[ implementedRpcGateways.size()]), akkaInvocationHandler);
}
ResourceManager为RpcEndpoint ,执行完构造方法后执行onStart方法
ResourceManager.onStart -> startResourceManagerServices
-> leaderElectionService.start(this) -> ResourceManager.grantLeadership
-> tryAcceptLeadership -> startServicesOnLeadership(){
/*************************************************
* TODO
* 注释: 开启心跳服务
*/
startHeartbeatServices();
/*************************************************
* TODO
* 注释: 启动 SlotManagerImpl
* 这个里面只是开启了两个定时任务而已
*/
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
}
2.2.2.1 startHeartbeatServices(); 心跳服务
/*************************************************
* TODO
* 注释: 当前 ResourceManager 启动了两个心跳服务:
* 1、taskManagerHeartbeatManager 这个心跳管理器 关心点的是: taskManager 的死活
* 2、jobManagerHeartbeatManager 这个心跳管理器 关心点的是: jobManager 的死活
* taskManager 集群的资源提供者,任务执行者,从节点
* jobManager 每一个job会启动的一个主控程序
* 不管是集群的从节点执行心跳,还是每一个job会启动的一个主控程序 都是向 ResourceManager 去汇报
* -
* 在 ResourceManager 启动的最后,会启动两个心跳管理器,分别用来接收:
* 1、TaskManager 的心跳
* 2、JobMaster 的心跳
*/
private void startHeartbeatServices() {
/*************************************************
* TODO
* 注释: 用来收听: TaskManager 的心跳
*/
taskManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);
/*************************************************
* TODO
* 注释: 用来收听: JobManager 的心跳
*/
jobManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log);
}
createHeartbeatManagerSender -> HeartbeatManagerSenderImpl -> this
-> mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS); -> run()
public void run() {
/*************************************************
* TODO
* 注释: 在 Flink 的心跳机制中,跟其他的 集群不一样:
* 1、ResourceManager 发送心跳给 从节点 Taskmanager
* 2、从节点接收到心跳之后,返回响应
*/
// TODO 注释: 实现循环执行
if(!stopped) {
/*************************************************
* TODO
* 注释: 遍历每一个 TaskExecutor 出来,然后发送 心跳请求
* 每一次 TaskEXecutor 来 RM 注册的时候, 在注册成功之后,就会给这个 TaskEXecutor 生成一个
* Target, 最终,这个 Target 被封装在 : Monitor,
* 最终,每个 TaskEXecutor 对应的一个唯一的 Monitor 就被保存在 heartbeatTargets map 中
* RM 在启动的时候,就已经启动了: TaskManagerHeartbeamManager
* 这个组件的内部: 启动了一个 HearBeatManagerSenderImpl 对象。
* 内部通过一种特别的机制,实现了一个 每隔 10s 调度一次 该组建的额 run 运行
* 最终的效果;
* RM 启动好了之后,就每隔10s 钟,向所有的已注册的 TaskEXecutor 发送心跳请求
* 如果最终,发现某一个 TaskExecutor 的上一次心跳时间,举例现在超过 50s
* 则认为该 TaskExecutor 宕机了。 RM 要执行针对这个 TaskExecutor 的注销
*/
log.debug("Trigger heartbeat request.");
for(HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
// TODO 注释: ResourceManager 给 目标发送(TaskManager 或者 JobManager) 心跳
requestHeartbeat(heartbeatMonitor);
}
/*************************************************
* TODO
* 注释: 实现循环发送心跳的效果
* 1、心跳时间:10s钟
* 2、心跳超时时间:50s钟
*/
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
--> requestHeartbeat -> heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
HeartbeatManagerImpl.requestHeartbeat()
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
if(!stopped) {
// TODO 注释: 接收到 ResourceID(当前Active ResourceManager) 的心跳请求
log.debug("Received heartbeat request from {}.", requestOrigin);
/*************************************************
* TODO 马中华 https://blog.csdn.net/zhongqi2513
* 注释: 执行心跳汇报
*/
final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);
if(heartbeatTarget != null) {
if(heartbeatPayload != null) {
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
-> heartbeatTarget.receiveHeartbeat()
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {
if(!stopped) {
log.debug("Received heartbeat from {}.", heartbeatOrigin);
reportHeartbeat(heartbeatOrigin);
if(heartbeatPayload != null) {
heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
}
}
}
实现了循环发送和收到心跳
2.2.2.2 启动slotManager
slotManager.start(){
/*************************************************
* TODO
* 注释: 开启第一个定时任务: checkTaskManagerTimeouts, 检查 TaskManager 的心跳
* taskManagerTimeout = resourcemanager.taskmanager-timeout = 30000
*/
taskManagerTimeoutCheck = scheduledExecutor
.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
/*************************************************
* TODO
* 注释: 开启第二个定时任务: checkSlotRequestTimeouts, 检查 SplotRequest 超时处理
* slotRequestTimeout = slot.request.timeout = 5L * 60L * 1000L
*/
slotRequestTimeoutCheck = scheduledExecutor
.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
}
后续的DefaultDispatcherResourceManagerComponentFactory.create().resourceManager.start();
-> RpcEndpoint.start -> rpcServer.start(); -> AkkaInvocationHandler.start(){
/*************************************************
* TODO
* 注释: 发送 START 消息
* 只要发送了 START 这个消息,也就意味着: ResourceManager 已经成功启动好了。 !
*/
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
2.3 创建和启动Dispatcher
里面会创建和启动jobMaster
DefaultDispatcherResourceManagerComponentFactory.create()->
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
-> DefaultDispatcherRunnerFactory.createDispatcherRunner -> create
-> DispatcherRunnerLeaderElectionLifecycleManager.createFor
-> new DispatcherRunnerLeaderElectionLifecycleManager
-> leaderElectionService.start(dispatcherRunner) -> DefaultDispatcherRunner.grantLeadership
-> startNewDispatcherLeaderProcess -> newDispatcherLeaderProcess::start -> AbstractDispatcherLeaderProcess.start
-> startInternal -> SessionDispatcherLeaderProcess.onStart -> createDispatcherIfRunning
-> createDispatcher -> DefaultDispatcherGatewayServiceFactory.create
-> SessionDispatcherFactory.createDispatcher(创建 Dispatcher)
-> new StandaloneDispatcher -> super -> public Dispatcher()
Dispatcher构造方法执行完成后会执行onStart方法
dispatcherBootstrap.initialize(把所有中断的 job 恢复执行) -> launchRecoveredJobGraphs
-> runRecoveredJob -> runJob( 客户端正常提交一个 job 的时候,最终由 集群主节点中的 Dispatcher 接收到来继续提交执行)
2.3.1 创建jobMaster
Dispatcher.runJob.createJobManagerRunner() -> createJobManagerRunner -> new JobManagerRunnerImpl
-> createJobMasterService -> new JobMaster
2.3.2 启动jobMaster
Dispatcher.runJob.startJobManagerRunner -> jobManagerRunner.start()-> JobManagerRunnerImpl.start()
-> leaderElectionService.start -> JobManagerRunnerImpl.grantLeadership
-> verifyJobSchedulingStatusAndStartJobManager -> startJobMaster
-> jobMasterService.start -> JobMaster.start -> startJobExecution
-> startJobMasterServices()
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
/*TODO 真正启动JobMaster服务*/
startJobMasterServices();
/*TODO 重置和启动调度器 */
resetAndStartScheduler();
}
------------------------------JobMaster--------------------------------
JobMaster.startJobMasterServices
private void startJobMasterServices() throws Exception {
/*TODO 启动心跳服务:taskmanager、resourcemanager*/
startHeartbeatServices();
// start the slot pool make sure the slot pool now accepts messages for this leader
/*TODO 启动 slotpool*/
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
scheduler.start(getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
/**
* TODO 重点: 启动后 slot pool 开始向 slot manager 请求 slot
* TODO 与ResourceManager建立连接,slotpool开始 向slotmanager 请求资源(slot)
* -> StandaloneLeaderRetrievalService
* -> ResourceManagerLeaderListener.notifyLeaderAddress
/*************************************************
* TODO
* 注释: 工作准备就绪,请尝试与资源管理器建立连接
* 注册 start() 方法的参数:
* 1、ResourceManagerLeaderListener 是 LeaderRetrievalListener 的子类
* 2、NodeCacheListener 是 curator 提供的监听器,当指定的 zookeeper znode 节点数据发生改变,则会接收到通知
* 回调 nodeChanged() 方法
* 3、在 nodeChanged() 会调用对应的 LeaderRetrievalListener 的 notifyLeaderAddress() 方法
* 4、resourceManagerLeaderRetriever 的实现类是: LeaderRetrievalService的实现类:ZooKeeperLeaderRetrievalService
* 5、resourceManagerLeaderRetriever 进行监听,当发生变更的时候,就会回调:ResourceManagerLeaderListener 的 notifyLeaderAddress 方法
*/
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
dispatcher.start(); Dispatcher 也是一个 RpcEndpoint 启动起来了之后,给自己发送一个 Hello 消息证明启动
参考 resourcemanager.start
2.3.2 slotpool开始 向ResourceManager请求资源(slot)
JobMaster.startJobMasterServices
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> ResourceManagerLeaderListener.notifyLeaderAddress -> notifyOfNewResourceManagerLeader -> reconnectToResourceManager
-> tryConnectToResourceManager -> connectToResourceManager -> resourceManagerConnection.start();
RegisteredRpcConnection.start() {
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
/*TODO 创建注册对象*/
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
/**
* TODO 开始注册,注册成功之后,调用
* jobMaster向ResourceManger 注册成功时调用 JobMaster.ResourceManagerConnection.onRegistrationSuccess()
* TaskExecutor向ResourceManger 注册成功时调用 TaskExecutorToResourceManagerConnection.onRegistrationSuccess
* -> TaskExecutor.ResourceManagerRegistrationListener.onRegistrationSuccess
* 哪个注册完成后调用哪个的onRegistrationSuccess
*/
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
--> RegisteredRpcConnection.createNewRegistration(创建注册对象)-> generateRegistration
-> JobMaster.ResourceManagerConnection.generateRegistration
-> 通过ResourceManagerGateway 向ResourceManager注册 ResourceManager.registerJobManager
-->RegisteredRpcConnection.newRegistration.startRegistration(开始注册)
-> register -> invokeRegistration
注册完成后调用JobMaster.ResourceManagerConnection.onRegistrationSuccess()
-> establishResourceManagerConnection ->
/*TODO slotpool连接到ResourceManager,请求资源*/
->slotPool.connectToResourceManager(resourceManagerGateway);
-> SlotPoolImpl.connectToResourceManager -> requestSlotFromResourceManager -> resourceManagerGateway.requestSlot(通过resourceManagerGateway rpc远程调用 ResourceManager的requestSlot方法) -> ResourceManager.requestSlot
-> slotManager.registerSlotRequest(slotRequest)/**ResourceManager内部的 slotManager去向 Yarn的ResourceManager申请资源*/
-> SlotManagerImpl.registerSlotRequest()-> internalRequestSlot -> allocateSlot(向 TaskManager 申请 Slot)
-> gateway.requestSlot
-> allocateSlot -> TaskSlotTableImpl.allocateSlot
-> offerSlotsToJobManager(提供一个 Slot 给 JobMaster)-> internalOfferSlotsToJobManager
-> jobMasterGateway.offerSlots
-> CompletableFuture.completedFuture(slotPool.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots));
3. TaskManager
TaskManager 是 Flink 的 worker 节点,它负责 Flink 中本机 slot 资源的管理以及具体 task 的执行。
TaskManager 上的基本资源单位是 slot,一个作业的 task 最终会部署在一个 TM 的 slot 上运行,TM
会负责维护本地的 slot 资源列表,并来与 Flink Master 和 JobManager 通信。
根据以上的脚本启动分析:TaskManager 的启动主类: TaskManagerRunner
详情参考上篇