Heron CLI中update命令实现背后的源码解析

概述

最近回顾了一下之前针对Heron的Blog内容,发现大部分都是在对Heron进行使用和实验部分的内容。对其理论方面的分析比较少。而Heron作为很新的一代流式计算平台,在目前国内Flink的热潮中,还是显得很冷清。

从Spark streaming的模拟流,到storm,flink实现真正的流处理,再到Twitter自我革命开源Heron。作为在国内较早接触Heron的一拨人(Heron2015年开源,自己2017年开始进行了实验环境的搭建和相关的研究,当时国内连环境搭建的文章都查不到,目前除了自己的好像也没见到,如果有人见到可以告诉我一下),也开始发现除了之前Heron的官方论文和文章,也多多少少多了一些关注。从在国内找不到资料,到目前CSDN中出现了一些文章,自己也成了一个见证者。因此后面计划就理论和源码层面对Heron写一些文章,来对这个开源系统多多少少做一些能做的事情。下面就开始这次的这部分内容吧。

Heron CLI(command line interface)中提供了如下可用的Topology管理命令,在scheduler相关的源码下可以看到相关的枚举对象。

/***
 * This enum defines commands invoked from heron client
 */
public enum Command {
    // TODO(mfu): Move ACTIVATE & DEACTIVATE out? They are non-related to Scheduling
    SUBMIT,
    KILL,
    ACTIVATE,
    DEACTIVATE,
    UPDATE,
    RESTART;

    public static Command makeCommand(String commandString) {
        return Command.valueOf(commandString.toUpperCase());
    }
}

Heron CLI的update命令允许用户在Topology运行的过程中动态的修改component的并行度(parallelism),使用的命令格式如下:

heron update local/ads/PROD my-topology 
    --component-parallelism=my-spout:2 
    --component-parallelism=my-bolt:4

具体的update命令的使用方式,可以通过Heron的文档进行了解。这里对其中update命令背后的实现进行拆解和分析,来看看是否能为我们提供一些其他的思路。

ApiServer

在Heron中,ApiServer负责处理用户从Heron CLI中提交的所有命令,其中当然包括了update的命令。因此我们从这里的源码开始。

package com.twitter.heron.apiserver.actions;

public class TopologyRuntimeAction implements Action {
    private final Config config;
    private final Command command;

    TopologyRuntimeAction(Config config, Command command) {
        this.config = config;
        this.command = command;
    }

    @Override
    public void execute() {
        final RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command);
        runtimeManagerMain.manageTopology();
    }
}

可以看到在ApiServer中,构造函数中根据用户传入的命令构建了command对象(这里省略了命令的解析和该类对象的调用过程)。然后在执行的execute方法中,根据Config对象(该对象就是用户在Topology指定参数并启动时的配置对象,封装了topology提交时的所有静态配置信息)和command对象构建了RuntimeManagerMain类的对象并调用了manageTopology()方法。该类是在运行时管理拓扑的核心处理类,也就是在topology运行过程中,所有对topology的管理和动态的调整均是由该类中的方法进行处理的。下面我们开看一下该类中调用的manageTopology方法。

RuntimeManagerMain

上面在action类中构建该类对象的时候调用了构造方法,该构造方法只是对config和command对象进行了初始化,没有额外的操作:

    public RuntimeManagerMain(Config config, Command command) {
        // initialize the options
        this.config = config;
        this.command = command;
    }

下面来看一下manageTopology方法:

    /**
     * Manager a topology
     * 1. Instantiate necessary resources
     * 2. Valid whether the runtime management is legal
     * 3. Complete the runtime management for a specific command
     */
    public void manageTopology()
            throws TopologyRuntimeManagementException, TMasterException, PackingException {
        String topologyName = Context.topologyName(config);
        // 1. Do prepare work
        // create an instance of state manager
        String statemgrClass = Context.stateManagerClass(config);
        IStateManager statemgr;
        try {
            statemgr = ReflectionUtils.newInstance(statemgrClass);
        } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
            throw new TopologyRuntimeManagementException(String.format(
                    "Failed to instantiate state manager class '%s'",
                    statemgrClass), e);
        }

        // Put it in a try block so that we can always clean resources
        try {
            // initialize the statemgr
            statemgr.initialize(config);

            // TODO(mfu): timeout should read from config
            SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);

            validateRuntimeManage(adaptor, topologyName);

            // 2. Try to manage topology if valid
            // invoke the appropriate command to manage the topology
            LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command});

            // build the runtime config
            Config runtime = Config.newBuilder()
                    .put(Key.TOPOLOGY_NAME, Context.topologyName(config))
                    .put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor)
                    .build();

            // Create a ISchedulerClient basing on the config
            ISchedulerClient schedulerClient = getSchedulerClient(runtime);

            callRuntimeManagerRunner(runtime, schedulerClient);
        } finally {
            // 3. Do post work basing on the result
            // Currently nothing to do here

            // 4. Close the resources
            SysUtils.closeIgnoringExceptions(statemgr);
        }
    }

可以从注释中看到这里包含了几个主要的步骤:

1. 准备工作。包括获取topologyname,根据config配置对象获取statemgrClass类名。然后使用反射,根据statemgrClass获取statemgr对象。该对象就是对Heron状态元数据进行保持并维护的对象,这里对应的就是Zookeeper。因此后面使用config对象对statemgr进行了初始化并连接(超时时间为5000ms)。最后开始根据topologyname验证当前拓扑是否处于合法的状态,能够进行update操作。

2. 根据命令进行topology的管理。首先这里创建了一个Config类的runtime对象,这个对象为在运行时的动态配置对象(相对于config对象,runtime中的配置项可以改变)。后面可以看到之后的runtime和config对象,均是从这里传入并贯穿整个过程的。

runtime创建完成后,调用了如下getSchedulerClient()的方法,来获取了schedulerClient对象。为什么需要获取scheduler对象呢?如果看过Heron中scheduler的代码可以知道,每个scheduler执行对topology进行update的具体过程都不相同,因此需要调用具体的scheduer中update方法以完成update命令。所以需要构建scheduelr对象。

    protected ISchedulerClient getSchedulerClient(Config runtime)
            throws SchedulerException {
        return new SchedulerClientFactory(config, runtime).getSchedulerClient();
    }

可以看到这里使用了工厂模式来构建scheduler对象。工厂中的getSchedulerClient方法如下:

/**
     * Implementation of getSchedulerClient - Used to create objects
     * Currently it creates either HttpServiceSchedulerClient or LibrarySchedulerClient
     *
     * @return getSchedulerClient created. return null if failed to create ISchedulerClient instance
     */
    public ISchedulerClient getSchedulerClient() throws SchedulerException {
        LOG.fine("Creating scheduler client");
        ISchedulerClient schedulerClient;

        if (Context.schedulerService(config)) {
            // get the instance of the state manager
            SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);

            Scheduler.SchedulerLocation schedulerLocation =
                    statemgr.getSchedulerLocation(Runtime.topologyName(runtime));

            if (schedulerLocation == null) {
                throw new SchedulerException("Failed to get scheduler location from state manager");
            }

            LOG.log(Level.FINE, "Scheduler is listening on location: {0} ", schedulerLocation.toString());

            schedulerClient =
                    new HttpServiceSchedulerClient(config, runtime, schedulerLocation.getHttpEndpoint());
        } else {
            // create an instance of scheduler
            final IScheduler scheduler = LauncherUtils.getInstance()
                    .getSchedulerInstance(config, runtime);

            LOG.fine("Invoke scheduler as a library");
            schedulerClient = new LibrarySchedulerClient(config, runtime, scheduler);
        }

        return schedulerClient;
    }

可以清楚的看到,过程根据if条件分为了两部分。那么这个if语句中是对config中的哪个配置项进行验证呢?其实就是在scheduler.yaml文件中的如下配置项:

heron.scheduler.is.service: false

该值默认为false,也就是不会将scheduler作为service进行启动。如果改为true,heron则会将scheduler作为一项http服务保持运行,在用到scheduler时,只需要使用service的URL来调用即可。因此这也是if语句成立中的基本过程。而我们保持了默认false,那么scheduler在提交拓扑之后实例就会被释放,因此这里需要根据配置文件重新创建一个scheduler对象来在后面调用其中的udpate方法。LauncherUtils.getInstance().getSchedulerInstance(config, runtime)的具体过程如下。根据类名反射创建指定的scheduler实例对象,并初始化即可。很好理解。

    /**
     * Creates and initializes scheduler instance
     *
     * @return initialized scheduler instances
     */
    public IScheduler getSchedulerInstance(Config config, Config runtime)
            throws SchedulerException {
        String schedulerClass = Context.schedulerClass(config);
        IScheduler scheduler;
        try {
            // create an instance of scheduler
            scheduler = ReflectionUtils.newInstance(schedulerClass);
        } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
            throw new SchedulerException(String.format("Failed to instantiate scheduler using class '%s'",
                    schedulerClass));
        }

        scheduler.initialize(config, runtime);
        return scheduler;
    }

该部分过程的最后一个步骤是使用scheduler对象,创建了一个LibrarySchedulerClient对象,该对象正是在后面使用scheduler来调用scheduler.onUpdate方法的地方。该类中包含了scheduler中对应的几个方法调用过程:

    @Override
    public boolean restartTopology(Scheduler.RestartTopologyRequest restartTopologyRequest) {
        // ...
    }

    @Override
    public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
        // ...
    }

    @Override
    public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
        boolean ret = false;
        try {
            scheduler.initialize(config, runtime);
            ret = scheduler.onUpdate(updateTopologyRequest);
        } finally {
            SysUtils.closeIgnoringExceptions(scheduler);
        }
        return ret;
    }

之后的过程,则是调用了callRuntimeManagerRunner(runtime, schedulerClient); 方法:

    protected void callRuntimeManagerRunner(Config runtime, ISchedulerClient schedulerClient)
            throws TopologyRuntimeManagementException, TMasterException, PackingException {
        // create an instance of the runner class
        RuntimeManagerRunner runtimeManagerRunner =
                new RuntimeManagerRunner(config, runtime, command, schedulerClient);

        // invoke the appropriate handlers based on command
        runtimeManagerRunner.call();
    }

该方法主要是创建了RuntimeManagerRunner对象来进行具体哪一个的command操作。(上面的分析中ApiServer只是构建了command对象,但如何对应的调用update方法还没有出现。正是这里。)

    public void call()
            throws TMasterException, TopologyRuntimeManagementException,
            PackingException, UpdateDryRunResponse {
        // execute the appropriate command
        String topologyName = Context.topologyName(config);
        switch (command) {
            case ACTIVATE: ...
            case DEACTIVATE: ...
            case RESTART: ...
            case KILL: ...
            case UPDATE: updateTopologyHandler(topologyName, config.getStringValue(NEW_COMPONENT_PARALLELISM_KEY));
                break;
            default:
                LOG.severe("Unknown command for topology: " + command);
        }
    }

这里主要关注update对应的过程:

    @VisibleForTesting
    void updateTopologyHandler(String topologyName, String newParallelism)
            throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
        // 1. 获取topo基本信息
        LOG.fine(String.format("updateTopologyHandler called for %s with %s",
                topologyName, newParallelism));
        SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
        TopologyAPI.Topology topology = manager.getTopology(topologyName);
        Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
        PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);

        // 2. 变化检查并构建新packingplan
        if (!changeDetected(currentPlan, changeRequests)) {
            throw new TopologyRuntimeManagementException(
                    String.format("The component parallelism request (%s) is the same as the "
                            + "current topology parallelism. Not taking action.", newParallelism));
        }

        PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
                topology);

        // 3. dryRun模式
        if (Context.dryRun(config)) {
            PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
            PackingPlan oldPlan = deserializer.fromProto(currentPlan);
            PackingPlan newPlan = deserializer.fromProto(proposedPlan);
            throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
        }

        // 4. 构建updateRequest并进行topo更新操作
        Scheduler.UpdateTopologyRequest updateTopologyRequest =
                Scheduler.UpdateTopologyRequest.newBuilder()
                        .setCurrentPackingPlan(currentPlan)
                        .setProposedPackingPlan(proposedPlan)
                        .build();

        LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
        if (!schedulerClient.updateTopology(updateTopologyRequest)) {
            throw new TopologyRuntimeManagementException(String.format(
                    "Failed to update topology with Scheduler, updateTopologyRequest="
                            + updateTopologyRequest));
        }

        // Clean the connection when we are done.
        LOG.fine("Scheduler updated topology successfully.");
    }

上面在注释中对该过程进行了划分。可以看到其中的主要过程如下:

  1. 获取topo的基本信息。包括从runtime中取回adapter(该对象为zookeer的连接对象,保存在了runtime中,这样在连接实效前都可以使用该对象连接zookeeper而不用重建连接,以提高实时性)。根据topologyName获取完整的topo信息。获取update命令中指定的parallelism的更新信息,以及获取当前的packingplan。
  2. 验证packingplan中的组件并行度是否发生了变化。
  3. 根据指定更新的parallelism构建新的packingplan(proposedPackingPlan)
  4. 使用新的packingplan来创建updateTopologyRequest对象。
  5. 最后调用schedulerClient.updateTopology()方法,传入updateTopologyRequest参数,来实施具体的更新过程。

该过程很重要(for me),后面会在其他文章中进行解释。

到这里我们代码是不是就回到了上面分析过的LibrarySchedulerClient中的updateTopology方法了,如下(和上面的一样):

    @Override
    public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
        boolean ret = false;
        try {
            scheduler.initialize(config, runtime);
            ret = scheduler.onUpdate(updateTopologyRequest);
        } finally {
            SysUtils.closeIgnoringExceptions(scheduler);
        }
        return ret;
    }

也就是具体执行一个scheduler中的onUpdate方法,对应于我目前的状况就是AuroraScheduler中的onUpdate方法。这里先放一下,我们后面再看。

还记得这是上面整体步骤的第2部分吗?【手动捂脸】。下面先看一下剩下的两个收尾步骤。

3. 做一些response的工作。根据代码,目前这部分还没有任何过程。

4. 结束部分:就是关闭各种资源。其实就是statemgr,zookeeper的连接资源。

AuroraScheduler

到这里,我们上面整体分析了所有的过程,但具体到某个scheduler中的onUpdate方法还没有进行分析。现在就具体进行这个过程。AuroraScheduler中的onUpdate方法如下:

    @Override
    public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
        try {
            updateTopologyManager.updateTopology(
                    request.getCurrentPackingPlan(), request.getProposedPackingPlan());
        } catch (ExecutionException | InterruptedException e) {
            LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
            return false;
        }
        return true;
    }

可以看到这里就是调用了updateTopologyManager.updateTopology()方法,并传入了当前的currentPackingPlan和新的proposedPackingPlan。updateTopologyManager的创建在scheduler的initialize方法中:

    @Override
    public void initialize(Config mConfig, Config mRuntime) {
        this.config = Config.toClusterMode(mConfig);
        this.runtime = mRuntime;
        try {
            this.controller = getController();
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
            LOG.severe("AuroraController initialization failed " + e.getMessage());
        }
        this.updateTopologyManager =
                new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
    }

下面就具体来看一下updateTopologyManager.updateTopology(currentPackingPlan, proposedPackingPlan)的过程:

    /**
     * Scales the topology out or in based on the proposedPackingPlan
     *
     * @param existingProtoPackingPlan the current plan. If this isn't what's found in the state
     *                                 manager, the update will fail
     * @param proposedProtoPackingPlan packing plan to change the topology to
     */
    public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
                               final PackingPlans.PackingPlan proposedProtoPackingPlan)
            throws ExecutionException, InterruptedException, ConcurrentModificationException {
        String topologyName = Runtime.topologyName(runtime);
        SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
        Lock lock = stateManager.getLock(topologyName, IStateManager.LockName.UPDATE_TOPOLOGY);

        if (lock.tryLock(5, TimeUnit.SECONDS)) {
            try {
                PackingPlans.PackingPlan foundPackingPlan = getPackingPlan(stateManager, topologyName);

                if (!deserializer.fromProto(existingProtoPackingPlan)
                        .equals(deserializer.fromProto(foundPackingPlan))) {
                    throw new ConcurrentModificationException(String.format(
                            "The packing plan in state manager is not the same as the submitted existing "
                                    + "packing plan for topology %s. Another actor has changed it and has likely"
                                    + "performed an update on it. Failing this request, try again once other "
                                    + "update is complete", topologyName));
                }

                updateTopology(existingProtoPackingPlan, proposedProtoPackingPlan, stateManager);
            } finally {
                lock.unlock();
            }
        } else {
            throw new ConcurrentModificationException(String.format(
                    "The update lock can not be obtained for topology %s. Another actor is performing an "
                            + "update on it. Failing this request, try again once current update is complete",
                    topologyName));
        }
    }

从上面的过程可以明显看到了分布式锁,Heron中使用了Zookeer实现的分布式锁,来保证这种更新过程的分布式一致性。具体分析一下这部分过程:

1. 获取topo的基本信息。同样还是包括topologyName,stateManager对象。

2. 获取分布式锁并进行更新操作。该过程中获取的锁key对应于topologyName,锁名为指定的常量updateTopology。然后加锁过程为5s,锁中进行同步的过程如下:

  • (1)使用stateManager,根据topologyName从zookeeper中获取当前之前保存的packingPlan信息。然后通过反序列化之后与方法中传入的currentPackingPlan进行比较,看是否一致。这里其实是一个校验的过程,防止待更新的packingPlan和zookeeper中维护的packingPlan数据不一致的问题。
  • (2)调用同名的重载方法执行具体的更新过程。
  • (3)释放锁。

下面就来看一下调用的同名重载方法的内容:

    private void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
                                final PackingPlans.PackingPlan proposedProtoPackingPlan,
                                SchedulerStateManagerAdaptor stateManager)
            throws ExecutionException, InterruptedException {
        // 1. 获取反序列化后的topo基本信息
        String topologyName = Runtime.topologyName(runtime);
        PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
        PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);

        Preconditions.checkArgument(proposedPackingPlan.getContainers().size() > 0, String.format(
                "proposed packing plan must have at least 1 container %s", proposedPackingPlan));
        
        // 2. 验证并计算container的update数量
        ContainerDelta containerDelta = new ContainerDelta(
                existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
        int newContainerCount = containerDelta.getContainersToAdd().size();
        int removableContainerCount = containerDelta.getContainersToRemove().size();

        String message = String.format("Topology change requires %s new containers and removing %s "
                        + "existing containers, but the scheduler does not support scaling, aborting. "
                        + "Existing packing plan: %s, proposed packing plan: %s",
                newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
        Preconditions.checkState(newContainerCount + removableContainerCount == 0
                || scalableScheduler.isPresent(), message);

        // 3. 验证当前topo的状态是否合法
        TopologyAPI.Topology topology = getTopology(stateManager, topologyName);
        boolean initiallyRunning = topology.getState() == TopologyAPI.TopologyState.RUNNING;

        // 4. 开始更新操作
        // (1)暂定topo运行
        // deactivate and sleep
        if (initiallyRunning) {
            // Update the topology since the state should have changed from RUNNING to PAUSED
            // Will throw exceptions internally if tmaster fails to deactivate
            deactivateTopology(stateManager, topology, proposedPackingPlan);
        }

        // (2)调用当前scheduler的addContainer或者removeAll方法来更新topo
        Set<PackingPlan.ContainerPlan> updatedContainers =
                new HashSet<>(proposedPackingPlan.getContainers());
        // request new resources if necessary. Once containers are allocated we should make the changes
        // to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
        if (newContainerCount > 0 && scalableScheduler.isPresent()) {
            Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
            Set<PackingPlan.ContainerPlan> containersAdded =
                    scalableScheduler.get().addContainers(containersToAdd);
            // Update the PackingPlan with new container-ids
            if (containersAdded != null) {
                updatedContainers.removeAll(containersToAdd);
                updatedContainers.addAll(containersAdded);
            }
        }

        // (3)根据更新后的container数量,重新构建一个新的packingPlan对象(updatedPackingPlan)
        PackingPlan updatedPackingPlan =
                new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
        PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
        PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
        LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);

        // (4)序列化后更新stateManager中的packingplan数据
        // update packing plan to trigger the scaling event
        logInfo("Update new PackingPlan: %s",
                stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));

        // 5. 重新激活拓扑
        // reactivate topology
        if (initiallyRunning) {
            // wait before reactivating to give the tmaster a chance to receive the packing update and
            // delete the packing plan. Instead we could message tmaster to invalidate the physical plan
            // and/or possibly even update the packing plan directly
            SysUtils.sleep(Duration.ofSeconds(10));
            // Will throw exceptions internally if tmaster fails to deactivate
            reactivateTopology(stateManager, topology, removableContainerCount);
        }

        if (removableContainerCount > 0 && scalableScheduler.isPresent()) {
            scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
        }
    }

具体的过程分解在代码中进行了标注。在5步骤中,可以看到在执行update操作后,拓扑从deactivate到reactivate过程中,会睡眠10s的时间,以为tmaster留出时间来更新packingplan和physicalplan的数据信息。

结束语

至此,我们分析完了Heron CLI中update命令背后的实现过程。其中可能有些细节未涉及到,请参考源码,自己后续还会进行一些补充。感谢您的阅读,如有错误之处,欢迎指正。

对一项技术的掌握,熟练使用应该是第一个层面,而从底层出发来看一项功能,一个组件的具体实现时,则会对该技术掌握的更加清晰。那么什么是底层,源码和架构是我认为的学习的两个角度。同时,开源项目的源码有很多可以学习和深究的内容。例如,上面的使用zookeeper实现分布式锁的实现部分,就是一个很好的学习机会。后面有机会也会对这方面的内容进行拆解和分析。而这次的update命令实现解析,也是为了之后的内容做了一个基础,一个铺垫。

博主码字不易,转载请注明出处。

发布了296 篇原创文章 · 获赞 35 · 访问量 2万+

猜你喜欢

转载自blog.csdn.net/yitian_z/article/details/103145445