概述
最近回顾了一下之前针对Heron的Blog内容,发现大部分都是在对Heron进行使用和实验部分的内容。对其理论方面的分析比较少。而Heron作为很新的一代流式计算平台,在目前国内Flink的热潮中,还是显得很冷清。
从Spark streaming的模拟流,到storm,flink实现真正的流处理,再到Twitter自我革命开源Heron。作为在国内较早接触Heron的一拨人(Heron2015年开源,自己2017年开始进行了实验环境的搭建和相关的研究,当时国内连环境搭建的文章都查不到,目前除了自己的好像也没见到,如果有人见到可以告诉我一下),也开始发现除了之前Heron的官方论文和文章,也多多少少多了一些关注。从在国内找不到资料,到目前CSDN中出现了一些文章,自己也成了一个见证者。因此后面计划就理论和源码层面对Heron写一些文章,来对这个开源系统多多少少做一些能做的事情。下面就开始这次的这部分内容吧。
Heron CLI(command line interface)中提供了如下可用的Topology管理命令,在scheduler相关的源码下可以看到相关的枚举对象。
/***
* This enum defines commands invoked from heron client
*/
public enum Command {
// TODO(mfu): Move ACTIVATE & DEACTIVATE out? They are non-related to Scheduling
SUBMIT,
KILL,
ACTIVATE,
DEACTIVATE,
UPDATE,
RESTART;
public static Command makeCommand(String commandString) {
return Command.valueOf(commandString.toUpperCase());
}
}
Heron CLI的update命令允许用户在Topology运行的过程中动态的修改component的并行度(parallelism),使用的命令格式如下:
heron update local/ads/PROD my-topology
--component-parallelism=my-spout:2
--component-parallelism=my-bolt:4
具体的update命令的使用方式,可以通过Heron的文档进行了解。这里对其中update命令背后的实现进行拆解和分析,来看看是否能为我们提供一些其他的思路。
ApiServer
在Heron中,ApiServer负责处理用户从Heron CLI中提交的所有命令,其中当然包括了update的命令。因此我们从这里的源码开始。
package com.twitter.heron.apiserver.actions;
public class TopologyRuntimeAction implements Action {
private final Config config;
private final Command command;
TopologyRuntimeAction(Config config, Command command) {
this.config = config;
this.command = command;
}
@Override
public void execute() {
final RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command);
runtimeManagerMain.manageTopology();
}
}
可以看到在ApiServer中,构造函数中根据用户传入的命令构建了command对象(这里省略了命令的解析和该类对象的调用过程)。然后在执行的execute方法中,根据Config对象(该对象就是用户在Topology指定参数并启动时的配置对象,封装了topology提交时的所有静态配置信息)和command对象构建了RuntimeManagerMain类的对象并调用了manageTopology()方法。该类是在运行时管理拓扑的核心处理类,也就是在topology运行过程中,所有对topology的管理和动态的调整均是由该类中的方法进行处理的。下面我们开看一下该类中调用的manageTopology方法。
RuntimeManagerMain
上面在action类中构建该类对象的时候调用了构造方法,该构造方法只是对config和command对象进行了初始化,没有额外的操作:
public RuntimeManagerMain(Config config, Command command) {
// initialize the options
this.config = config;
this.command = command;
}
下面来看一下manageTopology方法:
/**
* Manager a topology
* 1. Instantiate necessary resources
* 2. Valid whether the runtime management is legal
* 3. Complete the runtime management for a specific command
*/
public void manageTopology()
throws TopologyRuntimeManagementException, TMasterException, PackingException {
String topologyName = Context.topologyName(config);
// 1. Do prepare work
// create an instance of state manager
String statemgrClass = Context.stateManagerClass(config);
IStateManager statemgr;
try {
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new TopologyRuntimeManagementException(String.format(
"Failed to instantiate state manager class '%s'",
statemgrClass), e);
}
// Put it in a try block so that we can always clean resources
try {
// initialize the statemgr
statemgr.initialize(config);
// TODO(mfu): timeout should read from config
SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
validateRuntimeManage(adaptor, topologyName);
// 2. Try to manage topology if valid
// invoke the appropriate command to manage the topology
LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command});
// build the runtime config
Config runtime = Config.newBuilder()
.put(Key.TOPOLOGY_NAME, Context.topologyName(config))
.put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor)
.build();
// Create a ISchedulerClient basing on the config
ISchedulerClient schedulerClient = getSchedulerClient(runtime);
callRuntimeManagerRunner(runtime, schedulerClient);
} finally {
// 3. Do post work basing on the result
// Currently nothing to do here
// 4. Close the resources
SysUtils.closeIgnoringExceptions(statemgr);
}
}
可以从注释中看到这里包含了几个主要的步骤:
1. 准备工作。包括获取topologyname,根据config配置对象获取statemgrClass类名。然后使用反射,根据statemgrClass获取statemgr对象。该对象就是对Heron状态元数据进行保持并维护的对象,这里对应的就是Zookeeper。因此后面使用config对象对statemgr进行了初始化并连接(超时时间为5000ms)。最后开始根据topologyname验证当前拓扑是否处于合法的状态,能够进行update操作。
2. 根据命令进行topology的管理。首先这里创建了一个Config类的runtime对象,这个对象为在运行时的动态配置对象(相对于config对象,runtime中的配置项可以改变)。后面可以看到之后的runtime和config对象,均是从这里传入并贯穿整个过程的。
runtime创建完成后,调用了如下getSchedulerClient()的方法,来获取了schedulerClient对象。为什么需要获取scheduler对象呢?如果看过Heron中scheduler的代码可以知道,每个scheduler执行对topology进行update的具体过程都不相同,因此需要调用具体的scheduer中update方法以完成update命令。所以需要构建scheduelr对象。
protected ISchedulerClient getSchedulerClient(Config runtime)
throws SchedulerException {
return new SchedulerClientFactory(config, runtime).getSchedulerClient();
}
可以看到这里使用了工厂模式来构建scheduler对象。工厂中的getSchedulerClient方法如下:
/**
* Implementation of getSchedulerClient - Used to create objects
* Currently it creates either HttpServiceSchedulerClient or LibrarySchedulerClient
*
* @return getSchedulerClient created. return null if failed to create ISchedulerClient instance
*/
public ISchedulerClient getSchedulerClient() throws SchedulerException {
LOG.fine("Creating scheduler client");
ISchedulerClient schedulerClient;
if (Context.schedulerService(config)) {
// get the instance of the state manager
SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);
Scheduler.SchedulerLocation schedulerLocation =
statemgr.getSchedulerLocation(Runtime.topologyName(runtime));
if (schedulerLocation == null) {
throw new SchedulerException("Failed to get scheduler location from state manager");
}
LOG.log(Level.FINE, "Scheduler is listening on location: {0} ", schedulerLocation.toString());
schedulerClient =
new HttpServiceSchedulerClient(config, runtime, schedulerLocation.getHttpEndpoint());
} else {
// create an instance of scheduler
final IScheduler scheduler = LauncherUtils.getInstance()
.getSchedulerInstance(config, runtime);
LOG.fine("Invoke scheduler as a library");
schedulerClient = new LibrarySchedulerClient(config, runtime, scheduler);
}
return schedulerClient;
}
可以清楚的看到,过程根据if条件分为了两部分。那么这个if语句中是对config中的哪个配置项进行验证呢?其实就是在scheduler.yaml文件中的如下配置项:
heron.scheduler.is.service: false
该值默认为false,也就是不会将scheduler作为service进行启动。如果改为true,heron则会将scheduler作为一项http服务保持运行,在用到scheduler时,只需要使用service的URL来调用即可。因此这也是if语句成立中的基本过程。而我们保持了默认false,那么scheduler在提交拓扑之后实例就会被释放,因此这里需要根据配置文件重新创建一个scheduler对象来在后面调用其中的udpate方法。LauncherUtils.getInstance().getSchedulerInstance(config, runtime)的具体过程如下。根据类名反射创建指定的scheduler实例对象,并初始化即可。很好理解。
/**
* Creates and initializes scheduler instance
*
* @return initialized scheduler instances
*/
public IScheduler getSchedulerInstance(Config config, Config runtime)
throws SchedulerException {
String schedulerClass = Context.schedulerClass(config);
IScheduler scheduler;
try {
// create an instance of scheduler
scheduler = ReflectionUtils.newInstance(schedulerClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new SchedulerException(String.format("Failed to instantiate scheduler using class '%s'",
schedulerClass));
}
scheduler.initialize(config, runtime);
return scheduler;
}
该部分过程的最后一个步骤是使用scheduler对象,创建了一个LibrarySchedulerClient对象,该对象正是在后面使用scheduler来调用scheduler.onUpdate方法的地方。该类中包含了scheduler中对应的几个方法调用过程:
@Override
public boolean restartTopology(Scheduler.RestartTopologyRequest restartTopologyRequest) {
// ...
}
@Override
public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
// ...
}
@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
boolean ret = false;
try {
scheduler.initialize(config, runtime);
ret = scheduler.onUpdate(updateTopologyRequest);
} finally {
SysUtils.closeIgnoringExceptions(scheduler);
}
return ret;
}
之后的过程,则是调用了callRuntimeManagerRunner(runtime, schedulerClient); 方法:
protected void callRuntimeManagerRunner(Config runtime, ISchedulerClient schedulerClient)
throws TopologyRuntimeManagementException, TMasterException, PackingException {
// create an instance of the runner class
RuntimeManagerRunner runtimeManagerRunner =
new RuntimeManagerRunner(config, runtime, command, schedulerClient);
// invoke the appropriate handlers based on command
runtimeManagerRunner.call();
}
该方法主要是创建了RuntimeManagerRunner对象来进行具体哪一个的command操作。(上面的分析中ApiServer只是构建了command对象,但如何对应的调用update方法还没有出现。正是这里。)
public void call()
throws TMasterException, TopologyRuntimeManagementException,
PackingException, UpdateDryRunResponse {
// execute the appropriate command
String topologyName = Context.topologyName(config);
switch (command) {
case ACTIVATE: ...
case DEACTIVATE: ...
case RESTART: ...
case KILL: ...
case UPDATE: updateTopologyHandler(topologyName, config.getStringValue(NEW_COMPONENT_PARALLELISM_KEY));
break;
default:
LOG.severe("Unknown command for topology: " + command);
}
}
这里主要关注update对应的过程:
@VisibleForTesting
void updateTopologyHandler(String topologyName, String newParallelism)
throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
// 1. 获取topo基本信息
LOG.fine(String.format("updateTopologyHandler called for %s with %s",
topologyName, newParallelism));
SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
TopologyAPI.Topology topology = manager.getTopology(topologyName);
Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);
// 2. 变化检查并构建新packingplan
if (!changeDetected(currentPlan, changeRequests)) {
throw new TopologyRuntimeManagementException(
String.format("The component parallelism request (%s) is the same as the "
+ "current topology parallelism. Not taking action.", newParallelism));
}
PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
topology);
// 3. dryRun模式
if (Context.dryRun(config)) {
PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
PackingPlan oldPlan = deserializer.fromProto(currentPlan);
PackingPlan newPlan = deserializer.fromProto(proposedPlan);
throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
}
// 4. 构建updateRequest并进行topo更新操作
Scheduler.UpdateTopologyRequest updateTopologyRequest =
Scheduler.UpdateTopologyRequest.newBuilder()
.setCurrentPackingPlan(currentPlan)
.setProposedPackingPlan(proposedPlan)
.build();
LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
if (!schedulerClient.updateTopology(updateTopologyRequest)) {
throw new TopologyRuntimeManagementException(String.format(
"Failed to update topology with Scheduler, updateTopologyRequest="
+ updateTopologyRequest));
}
// Clean the connection when we are done.
LOG.fine("Scheduler updated topology successfully.");
}
上面在注释中对该过程进行了划分。可以看到其中的主要过程如下:
- 获取topo的基本信息。包括从runtime中取回adapter(该对象为zookeer的连接对象,保存在了runtime中,这样在连接实效前都可以使用该对象连接zookeeper而不用重建连接,以提高实时性)。根据topologyName获取完整的topo信息。获取update命令中指定的parallelism的更新信息,以及获取当前的packingplan。
- 验证packingplan中的组件并行度是否发生了变化。
- 根据指定更新的parallelism构建新的packingplan(proposedPackingPlan)
- 使用新的packingplan来创建updateTopologyRequest对象。
- 最后调用schedulerClient.updateTopology()方法,传入updateTopologyRequest参数,来实施具体的更新过程。
该过程很重要(for me),后面会在其他文章中进行解释。
到这里我们代码是不是就回到了上面分析过的LibrarySchedulerClient中的updateTopology方法了,如下(和上面的一样):
@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
boolean ret = false;
try {
scheduler.initialize(config, runtime);
ret = scheduler.onUpdate(updateTopologyRequest);
} finally {
SysUtils.closeIgnoringExceptions(scheduler);
}
return ret;
}
也就是具体执行一个scheduler中的onUpdate方法,对应于我目前的状况就是AuroraScheduler中的onUpdate方法。这里先放一下,我们后面再看。
还记得这是上面整体步骤的第2部分吗?【手动捂脸】。下面先看一下剩下的两个收尾步骤。
3. 做一些response的工作。根据代码,目前这部分还没有任何过程。
4. 结束部分:就是关闭各种资源。其实就是statemgr,zookeeper的连接资源。
AuroraScheduler
到这里,我们上面整体分析了所有的过程,但具体到某个scheduler中的onUpdate方法还没有进行分析。现在就具体进行这个过程。AuroraScheduler中的onUpdate方法如下:
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
try {
updateTopologyManager.updateTopology(
request.getCurrentPackingPlan(), request.getProposedPackingPlan());
} catch (ExecutionException | InterruptedException e) {
LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
return false;
}
return true;
}
可以看到这里就是调用了updateTopologyManager.updateTopology()方法,并传入了当前的currentPackingPlan和新的proposedPackingPlan。updateTopologyManager的创建在scheduler的initialize方法中:
@Override
public void initialize(Config mConfig, Config mRuntime) {
this.config = Config.toClusterMode(mConfig);
this.runtime = mRuntime;
try {
this.controller = getController();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOG.severe("AuroraController initialization failed " + e.getMessage());
}
this.updateTopologyManager =
new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
}
下面就具体来看一下updateTopologyManager.updateTopology(currentPackingPlan, proposedPackingPlan)的过程:
/**
* Scales the topology out or in based on the proposedPackingPlan
*
* @param existingProtoPackingPlan the current plan. If this isn't what's found in the state
* manager, the update will fail
* @param proposedProtoPackingPlan packing plan to change the topology to
*/
public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan)
throws ExecutionException, InterruptedException, ConcurrentModificationException {
String topologyName = Runtime.topologyName(runtime);
SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
Lock lock = stateManager.getLock(topologyName, IStateManager.LockName.UPDATE_TOPOLOGY);
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
PackingPlans.PackingPlan foundPackingPlan = getPackingPlan(stateManager, topologyName);
if (!deserializer.fromProto(existingProtoPackingPlan)
.equals(deserializer.fromProto(foundPackingPlan))) {
throw new ConcurrentModificationException(String.format(
"The packing plan in state manager is not the same as the submitted existing "
+ "packing plan for topology %s. Another actor has changed it and has likely"
+ "performed an update on it. Failing this request, try again once other "
+ "update is complete", topologyName));
}
updateTopology(existingProtoPackingPlan, proposedProtoPackingPlan, stateManager);
} finally {
lock.unlock();
}
} else {
throw new ConcurrentModificationException(String.format(
"The update lock can not be obtained for topology %s. Another actor is performing an "
+ "update on it. Failing this request, try again once current update is complete",
topologyName));
}
}
从上面的过程可以明显看到了分布式锁,Heron中使用了Zookeer实现的分布式锁,来保证这种更新过程的分布式一致性。具体分析一下这部分过程:
1. 获取topo的基本信息。同样还是包括topologyName,stateManager对象。
2. 获取分布式锁并进行更新操作。该过程中获取的锁key对应于topologyName,锁名为指定的常量updateTopology。然后加锁过程为5s,锁中进行同步的过程如下:
- (1)使用stateManager,根据topologyName从zookeeper中获取当前之前保存的packingPlan信息。然后通过反序列化之后与方法中传入的currentPackingPlan进行比较,看是否一致。这里其实是一个校验的过程,防止待更新的packingPlan和zookeeper中维护的packingPlan数据不一致的问题。
- (2)调用同名的重载方法执行具体的更新过程。
- (3)释放锁。
下面就来看一下调用的同名重载方法的内容:
private void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan,
SchedulerStateManagerAdaptor stateManager)
throws ExecutionException, InterruptedException {
// 1. 获取反序列化后的topo基本信息
String topologyName = Runtime.topologyName(runtime);
PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);
Preconditions.checkArgument(proposedPackingPlan.getContainers().size() > 0, String.format(
"proposed packing plan must have at least 1 container %s", proposedPackingPlan));
// 2. 验证并计算container的update数量
ContainerDelta containerDelta = new ContainerDelta(
existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
int newContainerCount = containerDelta.getContainersToAdd().size();
int removableContainerCount = containerDelta.getContainersToRemove().size();
String message = String.format("Topology change requires %s new containers and removing %s "
+ "existing containers, but the scheduler does not support scaling, aborting. "
+ "Existing packing plan: %s, proposed packing plan: %s",
newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
Preconditions.checkState(newContainerCount + removableContainerCount == 0
|| scalableScheduler.isPresent(), message);
// 3. 验证当前topo的状态是否合法
TopologyAPI.Topology topology = getTopology(stateManager, topologyName);
boolean initiallyRunning = topology.getState() == TopologyAPI.TopologyState.RUNNING;
// 4. 开始更新操作
// (1)暂定topo运行
// deactivate and sleep
if (initiallyRunning) {
// Update the topology since the state should have changed from RUNNING to PAUSED
// Will throw exceptions internally if tmaster fails to deactivate
deactivateTopology(stateManager, topology, proposedPackingPlan);
}
// (2)调用当前scheduler的addContainer或者removeAll方法来更新topo
Set<PackingPlan.ContainerPlan> updatedContainers =
new HashSet<>(proposedPackingPlan.getContainers());
// request new resources if necessary. Once containers are allocated we should make the changes
// to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
if (newContainerCount > 0 && scalableScheduler.isPresent()) {
Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
Set<PackingPlan.ContainerPlan> containersAdded =
scalableScheduler.get().addContainers(containersToAdd);
// Update the PackingPlan with new container-ids
if (containersAdded != null) {
updatedContainers.removeAll(containersToAdd);
updatedContainers.addAll(containersAdded);
}
}
// (3)根据更新后的container数量,重新构建一个新的packingPlan对象(updatedPackingPlan)
PackingPlan updatedPackingPlan =
new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);
// (4)序列化后更新stateManager中的packingplan数据
// update packing plan to trigger the scaling event
logInfo("Update new PackingPlan: %s",
stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));
// 5. 重新激活拓扑
// reactivate topology
if (initiallyRunning) {
// wait before reactivating to give the tmaster a chance to receive the packing update and
// delete the packing plan. Instead we could message tmaster to invalidate the physical plan
// and/or possibly even update the packing plan directly
SysUtils.sleep(Duration.ofSeconds(10));
// Will throw exceptions internally if tmaster fails to deactivate
reactivateTopology(stateManager, topology, removableContainerCount);
}
if (removableContainerCount > 0 && scalableScheduler.isPresent()) {
scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
}
}
具体的过程分解在代码中进行了标注。在5步骤中,可以看到在执行update操作后,拓扑从deactivate到reactivate过程中,会睡眠10s的时间,以为tmaster留出时间来更新packingplan和physicalplan的数据信息。
结束语
至此,我们分析完了Heron CLI中update命令背后的实现过程。其中可能有些细节未涉及到,请参考源码,自己后续还会进行一些补充。感谢您的阅读,如有错误之处,欢迎指正。
对一项技术的掌握,熟练使用应该是第一个层面,而从底层出发来看一项功能,一个组件的具体实现时,则会对该技术掌握的更加清晰。那么什么是底层,源码和架构是我认为的学习的两个角度。同时,开源项目的源码有很多可以学习和深究的内容。例如,上面的使用zookeeper实现分布式锁的实现部分,就是一个很好的学习机会。后面有机会也会对这方面的内容进行拆解和分析。而这次的update命令实现解析,也是为了之后的内容做了一个基础,一个铺垫。
博主码字不易,转载请注明出处。