Ray: A Distributed Framework for Emerging AI Applications

Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,目前还处于实验室阶段,具有比Spark更优秀的性能,有望在将来取代Spark。本篇博客是对该论文的简单翻译,如有翻译不妥的地方,欢迎指正。


0 简介

下一代AI应用程序将不断与环境交互,并从这些交互中学习。这些应用程序在性能和灵活性方面提出了新的和苛刻的系统要求。在本文中,我们考虑这些要求并提出Ray,一个分布式系统来解决它们。Ray实现了一个动态任务图计算模型,支持任务并行和基于actor的编程模型。为了满足人工智能应用的性能要求,我们提出了一种架构,使用共享式存储系统和新颖的自下而上的分布式调度器,在逻辑上集中了系统的控制状态。在我们的实验中,我们展示了亚毫秒的远程任务延迟和线性吞吐量,每秒超过180万个任务。我们凭经验验证了Ray加速了具有挑战性的基准测试,同时也是适合新兴的一类强化学习应用和算法的高性能表现。

1 介绍

人工智能目前正在成为一系列真实世界应用的主力技术。但是,迄今为止,这些应用程序主要基于一个相当局限的监督式学习模式,在这种模式下,一个模型在线训练,并被部署来在线服务预测。随着领域的成熟,有必要考虑比标准监督式学习更广泛的设置。机器学习应用程序不是做出和服务于单一预测,而是必须越来越多地在动态环境中运行,对环境变化作出反应,并采取一系列行动以实现一个目标。这些更广泛的要求自然地在强化学习(RL)的范式内进行,该学习在不确定的环境中学习持续运行。基于RL的应用程序已经取得显著的结果,例如Google的AlphaGo击败人类世界冠军,并正在寻找到自动驾驶汽车,无人机和机器人操纵方式。

有三个特点将RL应用与传统的监督学习应用区分开来。首先,他们经常严重依赖模拟来探索状态和发现行动的后果。模拟器可以编码计算机游戏的规则,诸如机器人的物理系统的牛顿动态,或者虚拟环境的混合动力学。这通常需要大量的计算;例如,一个现实的应用程序可能会执行数以百万计的模拟。其次,RL应用程序的图计算是异构的并且是动态演化的。仿真花费的时间从几毫秒到几分钟,仿真的结果可以确定未来仿真的参数。第三,许多RL应用程序,例如机器人控制或自动驾驶,需要迅速采取行动以应对不断变化的环境。此外,要选择最佳的操作,应用程序可能需要实时执行更多的模拟。总之,我们需要一个支持异构和动态图计算的计算框架,同时以毫秒级别的延迟处理每秒数百万个任务。

现有的集群计算框架没有充分满足这些要求。MapReduce,Apache Spark,Dryad,Dask和CIEL不支持通用RL应用程序所需的吞吐量和等待时间,而TensorFlow,Naiad,MPI和Canary通常假设静态计算图。

在本文中,我们提出了一个满足这些要求的集群计算框架Ray。为了支持这些应用程序强加的异构和动态工作负载,Ray实现了一个动态的任务图计算模型,类似于CIEL。但是,除了CIEL提供的任务并行抽象外,Ray还在此执行模型之上提供了一个actor编程抽象。Actor抽象使得Ray能够支持有状态的组件,比如第三方模拟器。

为了在支持动态计算图的同时实现严格的性能目标,Ray采用了可横向扩展的新型分布式体系结构。架构基于两个关键的想法。首先,我们将系统的所有控制状态存储在一个全局控制存储器中,这使得系统中的所有其他组件都是无状态的。因此,每个组件都可以轻松地水平扩展,并在发生故障时重新启动。反过来,全局控制存储可以通过共享进行扩展,并通过复制实现容错。

其次,我们介绍一种新的自下而上的分布式调度器,其任务由工作节点和驱动程序提交给本地调度器(每个节点有一个本地调度器)。本地调度程序可以选择本地调度任务或将任务转发到已备份的全局调度程序。这通过允许本地决策来减少任务延迟,并且通过减轻全局调度器的负担来增加系统吞吐量。我们做出如下贡献:

  • 我们为新兴AI应用指定系统需求:
    • (a)异构并行计算
    • (b)动态任务图
    • (c)高吞吐量和低延迟调度
    • (d)透明容错
  • 除了任务并行编程抽象之外,我们还提供动态任务图计算模型之上的actor抽象。

  • 我们提出了一个可水平扩展的体系结构来满足上述要求,并构建实现此体系结构的集群计算系统Ray。

2 动机和需求

这里写图片描述
这里写图片描述

虽然Ray可以支持各种工作负载,因为它提供了任务并行和参与者抽象,但我们专注于本文中的加强学习(RL)工作负载,因为它们是新兴AI应用的代表,也是Ray设计的主要驱动因素。在这里,我们考虑一个简单的RL应用程序来说明Ray的关键要求。

RL系统由与环境重复交互的代理组成(参见Figure 1(a))。Agent的目标是学习最大化回报的策略。策略是从环境状态到采取行动的映射。环境,状态,Agent,行动和回报的定义是特定于应用程序的 (Table 1)。

Figure 2展示了agent学习策略所使用的伪代码示例。典型的程序包括两个步骤:(1)评估当前的政策;(2)改进政策。为了评估策略,伪代码调用rollout(环境,策略)来生成一组rollout,其中rollout是通过使用environment.step(action)与环境交互收集的状态和回报的轨迹。根据当前策略和环境状态通过policy.compute(state)计算操作。随着轨迹的产生,train_policy()使用已完成的轨迹通过policy.update(轨迹)改进当前的策略。重复这个过程直到策略收敛。

虽然很简单,但该应用程序说明了新兴AI应用程序的关键要求。 我们将这些要求分为三类。

灵活性。 系统的灵活性通常根据其可支持的工作负载的多样性来衡量。我们考虑灵活性的两个方面:并发执行任务的异构性和执行图计算的一般性和动态性。
并行,异构的任务。 并行任务可能在三个方面是异构的:

  • 功能。以机器的情况,评估环境的状态(例如,environment.step(action))涉及处理多个传感器(例如视频,麦克风和雷达)的输入。这需要并行运行多个任务,每个都执行不同的计算(见Figure 1(b))。
  • 持续时间。计算轨迹花费的时间可能会有很大差异(参照rollout(policy, environment))。例如,在游戏的情况下,可能只需要几个动作(动作)就会输掉游戏,或者可能需要数百动作来获胜。
  • 资源类型。通过评估策略(例如,policy.compute(状态))来计算动作在很多情况下通过深度神经网络来实现,所述深度神经网络通常需要使用GPU。另一方面,大多数其他应用的计算也需使用CPU。

请注意,这些需求并非对当今许多流行的群集计算框架实现的批量同步并行(BSP)模型满足。使用BSP,同一阶段内的所有任务通常执行相同的计算(尽管在不同的数据分区上)并且花费大致相同的时间量。

动态任务图。考虑train_policy()函数。尽管未在Figure 2中显示,但只要部分rollouts完成(而不是等待所有)并启动新的rollouts以维护执行rollouts(如图Figure 1(c)所示)。这使得执行图形具有动态性,因为我们无法预测rollouts的完成顺序或哪些rollouts将用于特定策略更新。

性能。 在机器人与物理环境进行交互的情况下,我们需要推断环境的状态并在几毫秒内计算新的行为。同样,模拟也可能需要几毫秒的量级。因此,我们需要能够在不到一毫秒的时间内安排任务。否则,调度开销可能会很大。鉴于拥有数万个内核的集群很常见,我们需要能够每秒安排数十万甚至数百万个任务。考虑一个由100台服务器组成的集群,每台服务器都有32个内核,假设每个任务需要5ms执行。为了充分利用这个集群,我们需要安排640K任务/秒。

易于开发。 由于编写并行应用程序并不重要,因为ML开发人员更倾向于专注于其应用程序而不是系统编程,所以简化开发对于此类系统的成功至关重要。

确定性重新运行和容错。确定性地重新运行作业的能力大大简化了调试。透明的容错功能可以避免用户明确处理故障。它还使用户能够使用便宜的可抢占资源(例如AWS上的现货实例),从而在公共云中运行时节省大量成本。

现有算法的简单并行化。这涉及提供一个简单的API并支持现有的语言,工具和库。首先,我们需要为Python提供支持,因为Python是AI开发人员的首选语言。其次,我们需要提供与广泛的可用第三方库的紧密集成。这些库包括模拟器,如OpenAI,DeepMind实验室,Mujoco物理模拟器以及TensorFlow,Theano,PyTorch和Caffe等深度学习框架。 正如我们将看到的,这需要用类似actor的抽象来扩展任务并行模型以包装这些第三方服务。

3 编程和计算模型

这里写图片描述
这里写图片描述

3.1 编程模型和API

Ray的核心是提供一个任务并行编程模型。Table 2显示了Ray的API。当调用远程函数时,将立即返回表示任务结果的future。可以使用ray:get()来检索future,将来可以作为参数传递到另一个远程函数。这允许用户在捕获数据依赖性时表达并行性。
远程函数对不可变对象进行操作,并且预期无状态和副作用:它们的输出完全由它们的输入决定。这意味着幂等性,它通过在失败时重新执行函数来简化容错。
为了满足第2节给出的异构性,灵活性和开发简便性的要求,我们用四种方法来增强任务并行编程模型。

首先,为了处理具有不同持续时间的并发任务,我们引入了ray:wait()。这个调用需要一系列futures,并在超时后或至少有k个可用时返回其结果可用的子集。相反,ray:get()会阻塞除非所有的future可用。 这对于RL应用来说是非常有益的,因为仿真可能具有广泛不同的持续时间,但由于引入的不确定性而使得容错复杂化。

其次,为了处理资源异构任务,我们使开发人员能够指定资源需求,以便Ray调度程序可以高效地管理资源。为远程函数指定的资源仅在函数执行期间分配。

第三,为了提高灵活性,我们启用了嵌套远程功能,这意味着远程功能可以调用其他远程功能。这对于实现高可伸缩性(见第4节)也很关键,因为它使多个进程能够并行调用远程功能(否则驱动程序将成为任务调用的瓶颈)。

最后,也是最重要的一点,为了便于开发和提高效率,我们通过使用Actor抽象来增强我们的编程模型。我们在开发无状态任务时遇到的一个限制是无法包含第三方模拟器,这些模拟器不公开其内部状态。为了解决这个限制,Ray以actor的形式为有状态组件提供基本支持。 在Ray中,actor是一个有状态的进程,它公开了一组可以作为远程函数调用并且可以连续执行这些方法的方法。

3.2 计算模型

Ray采用动态任务图计算模型,在该模型中,系统在输入可用时自动触发远程函数和actor模型方法的执行。在本节中,我们将描述如何从用户程序(Figure 3(a))构建图计算(Figure (b))。该程序使用Table 2中的API来实现Figure 2中的伪代码。

首先忽略角色,图计算中有两种类型的节点:数据对象和远程函数调用或任务。还有两种类型的边缘:数据边缘和控制边缘。数据边缘捕获数据对象和任务之间的依赖关系。更准确地说,如果数据对象D是任务T的输出,则我们添加从T到D的数据边缘。同样,如果D是T的输入,我们添加从D到T的数据边缘。控制边捕获嵌套远程函数产生的计算依赖性(第3.1节):如果任务T1调用任务T2,则我们添加从T1到T2的控制边。

Actor方法调用也被表示为图计算中的节点。它们与具有一个关键区别的任务完全相同。为了在同一个actor上的后续方法调用中捕获状态依赖,我们添加第三种类型的边缘:一个有状态的边缘。如果方法Mj在同一个actor上的方法Mi之后被调用,那么我们添加一个从Mi到Mj的有状态边。因此,在同一个actor对象上调用的所有方法形成一个由状态边连接的链(Figure 3(b))。该链捕获这些方法被调用的顺序。

有状态的边帮助我们将actor嵌入到另一个无状态的任务图中,因为它们捕获连续方法调用之间共享actor内部状态的隐式数据依赖关系。有状态的边缘也使我们能够保持血统。和其他数据流系统一样,我们追踪数据血缘以实现重建。通过在血缘图中明确包含有状态边,我们可以轻松地重建丢失的数据,无论是由远程函数还是由actor方法产生(第4.2.3节)。

4 系统结构

这里写图片描述
这里写图片描述
这里写图片描述

4.1 应用层

应用程序层由三部分组成:

  • 驱动程序:执行用户程序的进程。
  • Worker(工作对象):执行由驱动程序或其他工作对象调用的任务(远程函数)的无状态进程。系统层自动启动工作对象并分配任务。当声明远程函数时,该函数会自动发布给所有工作对象。worker连续执行任务。
  • Actor:一个有状态的进程,当它被调用时执行它暴露的方法。与worker不同,actor由worker或driver明确实例化。和worker一样,actor连续执行方法。

请注意,worker是没有状态的,因为它们不会在任务中保持本地状态。假设执行确定性任务,调用具有相同参数的相同远程函数将返回相同的结果,而不管它是否在同一个worker上执行。 相比之下,actor是一个有状态的过程,方法调用的结果可能依赖于该actor执行的先前方法。

4.2 系统层

系统层使我们能够满足性能和容错目标,如第2节所述,采用一种架构,其中每个组件均可水平扩展和容错。该层由三个主要组件组成:全局控制存储区,分布式调度程序和分布式对象存储区。

4.2.1 全局控制存储区

其核心是我们的系统利用全局控制存储(GCS),该存储将所有最新的元数据和控制状态信息存储在系统中。这包括:

  • (1)每个任务的规范
  • (2)每个远程函数的代码
  • (3)图计算
  • (4)所有对象的当前位置
  • (5)每个调度事件。GCS还提供发布订阅基础设施以促进组件之间的通信。

通过以集中方式存储和管理整个控制状态,GCS使每个其他组件都成为无状态。这不仅简化了对容错的支持(即失败时,组件重新启动并从GCS读取最新状态),而且还可以轻松地对每个其他组件进行横向扩展,因为组件的副本或碎片共享的所有状态可通过GCS访问。

为了扩展GCS,我们使用分片。由于我们可以将伪随机ID实际上与GCS中的每个数据条目(例如对象,任务,函数)相关联,因此跨多个分片平衡负载相对容易。为了提供容错功能,我们为每个分片使用动态复制副本。

集中系统控制信息使我们能够在GCS之上轻松构建调试,分析和可视化工具。我们迄今为止制作的简约工具在我们的开发中已经证明是有用的。

4.2.2 自下而上的分布式调度器

许多现有的集群计算框架(如Apache Spark,CIEL,Dryad和Hadoop)都实现了集中式调度程序。虽然这简化了设计,但它损害了可扩展性。

有几种方法可以提高调度可扩展性:

-(1)批量调度,其中调度程序批量向worker节点提交任务以分摊与任务提交相关的固定开销(例如Drizzle)。
-(2)分层调度,其中全局调度器将任务图分割成跨每个节点的本地调度器(例如,Canary)。
-(3)并行调度,其中多个全局调度器在所有worker节点(例如,Sparrow)上同时调度任务。
不幸的是,这些方法都不符合Ray的要求。批量调度仍然需要全局调度器来处理每个任务,这限制了其可伸缩性,分层调度假定任务图是事先已知的(即图是静态的),并行调度假定每个全局调度器调度独立作业。相比之下,我们需要高度可扩展的调度程序来处理动态任务图,该动态任务图可能由单个作业生成。

像现有的分层调度解决方案一样,我们采用全局调度器和各节点本地调度器。但是,与以前的解决方案不同,节点上创建的任务首先提交给节点的本地调度程序,而不是全局调度程序(Figure 5)。本地调度程序在本地调度任务,除非节点超载,或者它不能满足任务的要求(例如,缺少GPU),或者任务的输入是远程的。如果本地调度程序不安排任务,它将任务发送到全局调度程序。为了确定负载,本地调度程序检查其任务队列的当前长度。如果长度超过某个可配置的阈值,则认为本地节点过载。这个阈值的设置,当所有任务被移交给全局调度器时,可以使调度策略保持跨越连续集中,当所有任务在本地处理时,分散到各地。

每个本地调度程序定期向包含其负载信息的GCS发送心跳(例如每100ms)。 GCS记录此信息并将其转发给全局调度程序。接收到任务后,全局调度程序使用来自后续节点的最新负载信息以及任务输入的位置和大小(来自GCS的对象元数据)来决定将任务分配给哪个节点。如果全局调度程序成为瓶颈,我们可以实例化更多副本,并让每个本地调度程序随机选择一个副本来发送其任务。这使我们的调度程序体系结构高度可扩展。

4.2.3 内存分布式对象存储

为了尽量减少任务延迟,我们实施了内存分布式存储系统来存储每项任务的输入和输出。这使workers和actors能够有效地共享数据。在每个节点上,我们通过共享内存实现对象存储。这允许在同一节点上运行的任务之间共享零拷贝数据。另外,我们使用Apache Arrow,这是一种高效的内存布局,正在成为数据分析中事实上的标准。

如果任务的输入不是本地的,则在执行之前将输入复制到同一节点上的本地对象存储。任务还将所有输出写入本地对象存储。由于任务只能在本地内存中读写数据,因此复制消除了由于热数据对象而导致的潜在瓶颈并最大限度地减少了任务执行时间。这增加了计算绑定工作负载的吞吐量,这是许多AI应用程序共享的配置文件。

现有的集群计算框架(如Apache Spark和Dryad),其对象存储局限于不可变数据,系统简化设计,通过避免需要复杂的一致性协议(避免并发更新),简化支持容错。

为了简单起见,我们的对象存储不支持分布式对象,也就是说,每个对象都适合单个节点。像大矩阵或树这样的分布式对象可以作为future的集合在更高的层次(例如,应用层)上实现。

对象重建。组件故障可能导致对象丢失,Ray通过血统重新执行恢复。Ray通过在执行过程中记录GCS中的任务依赖关系来跟踪血缘。这与Apache Spark和CIEL等其他集群计算系统采用的解决方案类似。此外,与这些系统一样,Ray假定对象是不可变的,而运算符(即远程函数和actor方法)是确定性的。但是,与这些系统不同,Ray增加了对有状态操作者(actor)重建的支持。通过将状态边直接集成到图计算中,我们可以为远程函数和actor使用相同的重构机制。

为了重建一个丢失的对象,我们沿着数据和有状态边向后执行,直到找到其输入全部出现在对象存储区中的任务。然后我们重复以这些输入为根的计算子图。考虑图Figure 3(b)中的例子,并假设rollout12已经丢失。通过沿着数据和有状态边向后走,我们到达没有输入的A10。因此,为了重建rollout12,我们需要通过执行A10来重新实例化actor,然后按顺序执行方法A11和A12。

注意,对于其血缘包括有状态边缘的任何对象,重构将需要重新实例化该actor(例如A10)并重新执行可能长的有状态边缘链(例如,A11; A12等)。由于actor通常用于包装具有有限生命周期的第三方模拟器,因此我们预计这些链条是有界的。然而,我们也发现actor对管理更一般的状态很有用。为了在这种情况下提高恢复时间,我们定期检查actor的状态,并允许actor从检查点恢复。

为了低延迟,我们将对象完全保存在内存中,并使用最近最少使用的释放策略将它们从内存中释放。

4.3 将所有聚集一起

Figure 6展示了Ray如何通过一个简单的例子进行端对端的工作,该例子添加了两个对象a和b,它们可以是标量或矩阵,并返回结果c。远程函数add()在初始化时自动向GCS注册,并分发给系统中的每个worker(Figure 6(a)中的步骤0)。

Figure 6(a)显示了driver程序调用add.remote(a; b)触发的逐步操作,其中a和b分别存储在节点N1和N2上。driver程序向本地调度器提交add(a,b)(步骤1),该调度器将其转发给全局调度器(步骤2).接下来,全局调度器查找add(a,b)参数的位置在GCS中(步骤3)并且决定在存储参数b的节点N2上调度任务(步骤4)。节点N2处的本地调度器检查本地对象库是否包含add(a,b)的参数(步骤5)。由于本地存储没有对象a,因此它在GCS中查找a的位置(步骤6)。了解a存储在N1中,N2的对象存储在本地复制(步骤7)。由于add()的所有参数现在都存储在本地,本地调度程序在本地worker上调用add()(步骤8),它通过共享内存访问参数(步骤9)。

Figure 6(b)分别显示了执行N1时的ray.get()和N2时的add()触发的逐步操作。在ray.get(idc)的调用之后,驱动程序使用add()返回的future idc(步骤1)检查本地对象存储库中的值c。由于本地对象存储不存储c,它会在GCS中查找它的位置。此时,c没有条目,因为c尚未创建。结果,N1的对象存储注册了一个回调对象表,在创建c的入口时触发(步骤2)。同时,在N2中,add()完成其执行,将结果c存储在本地对象存储中(步骤3),然后将c的条目添加到GCS(步骤4)。结果,GCS用c的输入触发对N1的对象存储的回调(步骤5)。接下来,N1从N2复制c(步骤6),并将c返回给ray.get()(步骤7),最终完成该任务。

虽然这个例子涉及大量的RPC,但在很多情况下,这个数字要小得多,因为大多数任务都是在本地调度的,并且GCS响应由全局和本地调度器缓存。

5 实现

Ray在4万代码行(LoC)中实现,在系统层的C ++中为72%,在应用程序层中的Python为28%。对象存储库和我们的零拷贝序列化库已作为可独立于Ray使用的独立项目进行了分解。

自下而上的分布式调度程序(第4.2.2节)为3.2KLoC,并将在我们改进Ray的调度策略时进行重大开发。在本节中,我们重点讨论实现实时AI应用程序所实现的性能目标的实现细节:

-(a)调度程序性能。
-(b)对象存储库性能。
-(c)端到端系统性能。

自下而上的分布式调度器。我们将本地和全局调度程序实现为事件驱动的单线程过程。在内部,本地调度程序维护本地对象元数据的缓存状态,等待输入的任务以及准备派发给worker的任务。随着对象依赖关系变得可用,任务已准备好用于分派。Worker可用性触发了在节点容量限制下尽可能多的任务分派。

本地调度程序定期向全局调度程序发送心跳(每100ms),通过发布订户机制经过GCS,其中包含调度队列长度和资源可用性。这使全局调度程序能够平衡各节点间的负载。

对象存储。Ray的对象存储也被实现为单线程事件循环。它使用共享内存,因此同一节点上的worker无需复制即可读取数据。对象是不可变的。一个对象只有在worker完成创建后才可见。为了最大限度地减少对象创建开销,应用商店预分配一个大型内存映射文件池。我们使用类似SIMD的内存副本来最大化将数据从worker复制到对象存储的共享内存的吞吐量。我们也并行计算一个对象的内容哈希,它用于检测非确定性计算。Ray使用Apache Arrow在序列化/反序列化Python对象时实现高性能。

全局控制存储。我们使用每个分片的一个Redis键值存储(Redis中可以轻松地与其他键值存储交换)实施Ray的全局控制存储(GCS)。我们通过对象和任务ID对GCS表进行分片以扩展,并且我们复制每个分片以进行容错。随着我们扩大实验范围,我们在多个节点上分配碎片。尽管我们的GCS实现使用多个Redis服务器,但我们的性能和容错要求也可以通过像RAMCloud这样的现有系统来满足。

最后,Ray的监视器跟踪系统组件的存活性并反映GCS中的组件故障。必要时,失败群集节点上的任务和对象被标记为丢失,并且随后使用血缘信息重建对象。

6 评估

这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
在本节中,我们将展示三个关键点。首先,我们考察整个系统的可扩展性以及各个组件的性能(第6.1节)。其次,我们展示了鲁棒性和容错性(第6.2节)。第三,我们证明了Ray在强化学习应用方面非常合适,无论是在性能还是开发方面(第6.3节)。所有实验均在亚马逊网络服务上运行。具体的实例类型如下所示。

6.1 可扩展性和性能

端到端的可扩展性。全局控制存储(GCS)的主要优势之一是可以横向扩展系统。我们在这一节评估这种能力。在Figure 7中,我们测试了一个困难的并行工作负载,增加了x轴上的簇大小。我们观察到逐渐增加的任务吞吐量接近完美的线性。Ray在60个节点上每秒吞吐量超过100万个任务,并且在100个节点上继续线性扩展到每秒180万个任务以上。最右边的数据点显示Ray可以在不到一分钟(54秒)内处理1亿个磁道任务。可变性(用黑色误差线显示)很小。正如预期的那样,随着任务持续时间的增加,吞吐量会按比例降低平均任务持续时间,但整体可扩展性保持线性。

全球调度程序的主要职责是在整个系统中保持均衡的负载。在Figure 8中,单个节点上提交的100K个任务在可用资源之间进行重新平衡。请注意,负载源的节点处理更多任务,因为它在将任务转发到全局调度程序之前最大化本地节点的利用率。

对象存储的性能。我们对比对象存储性能的两个度量标准:Figure 9中的IOPS(针对小对象)和写吞吐量(针对大对象)。随着对象大小的增加,单个客户端的写入吞吐量达到15GB / s。对于较大的对象,从客户端复制对象支配对象创建的时间。对于较小的对象,完成时间由客户端和对象存储之间的序列化开销和IPC决定。对象存储峰值为18K IOPS,相当于每次操作花费56µs。

6.2 容错

从对象故障中恢复。在Figure 10中,我们演示了Ray从worker节点故障中透明恢复并弹性扩展的能力。驱动程序提交几轮任务,其中每个任务都依赖于前一轮中的任务。当Worker节点被杀死(在25s,50s,100s)时,幸存的本地调度器会自动触发重建丢失的对象。在重建期间,Driver最初提交的任务不运行,因为它们的依赖不能得到满足。但是,总体任务吞吐量保持稳定,充分利用可用资源,直到重建丢失的依赖关系。此外,随着更多节点在210s时加回系统,Ray能够完全恢复到其初始吞吐量。

从actor失败中恢复。接下来,我们演示Ray的透明恢复失去的actor的能力。通过将每个参与者的方法调用编码到依赖关系图中,我们可以重用与Figure 10中相同的对象重构机制。Figure 11a中的工作负载演示了没有中间参与者状态被保存的极端情况。之前对每个失去actor的方法调用必须连续重新执行(t = 210-330s)。失去的actor会自动在可用节点间重新分配,吞吐量在重建后会完全恢复。

为了改善长期存活actor的重建时间,我们提供了中间角色状态的透明检查点。Figure 11b显示了相同的工作负载,但每10个方法调用每个角色的自动检查点任务。初始吞吐量与没有检查点的吞吐量相当。节点失效后,大部分重建都是通过执行检查点任务来完成的,从而重建actor的状态(t = 210-270s)。结果,只有500个任务需要重新执行,而新方法调用分别停顿了60秒,而重新执行了10K次,而没有检查点的120秒。未来,我们希望进一步减少actor重建时间,例如,通过允许用户注释为只读方法。

来自GCS备份的开销。为了使GCS容错,我们复制每个数据库碎片。当客户端写入GCS的其中一个分片时,它会将写入复制到所有副本。对于通过减少GCS碎片数量来人为地减少使GCS成为瓶颈的工作负载,双向复制的开销不到10%。在大多数实际的工作量中,减速是无法察觉的。

6.3 强化学习应用

鉴于第2节中描述的强化学习应用程序的多样化和苛刻的要求,今天的强化学习算法在特殊用途ad-hoc系统之上实现,这些特殊用途ad-hoc系统通常需要大量的工程设计开发工作,而不会推广到其他算法。

在本节中,我们在Ray中实现了两种强化学习算法,并表明我们能够匹配或优于专门为这些算法构建的专用系统的性能。此外,使用Ray在集群上分发这些算法需要在算法的串行实现中仅更改几行代码。

另外,我们在一个对延迟敏感的设置中测试Ray,在该设置中,Ray用于在不同的实时要求下控制模拟机器。

6.3.1 进化策略

为了在大规模的RL工作负载上评估Ray,我们实现了演化策略(ES)算法,并与参考实现进行了比较,该参考实现是为该算法构建的专用系统。它使用一系列Redis服务器作为消息总线,并依靠低级多处理库来共享数据。

如Figure 12所示,在Ray之上的一个简单的实现是可扩展的,扩展到8192个物理内核,而专用系统在1024个内核之后停止运行。Ray实施的中位时间为3.7分钟,是最佳公布结果(10分钟)的两倍。Ray的实现也大大简化了开发。使用Ray并行化串行实现需要修改7行代码。相比之下,参考实现需要数百行代码来开发用于在工作人员之间传递任务和数据的自定义协议,并且不能容易地适应不同的算法或通信模式。我们在B.1节中包含说明这一点的伪代码。

6.3.2 近端策略优化

为了在单节点和小型集群RL工作负载上评估Ray,我们在Ray中实现了近端策略优化(PPO),并与使用OpenMPI通信原语的高度优化的参考实现进行了比较。所有实验均使用p2.16xlarge(GPU)和m4.16xlarge(高CPU)实例,每个实例都有32个物理核心。
Ray的API可以轻松利用异构资源将成本降低4.5倍。Ray任务和参与者可以指定不同的资源需求,从而允许在廉价的高CPU实例上调度纯CPU任务。相比之下,MPI应用程序通常具有对称体系结构,其中所有进程运行相同的代码并需要相同的资源,在这种情况下,防止仅使用CPU的机器进行横向扩展。
如Figure 13所示,Ray实现在所有实验(部分D中列出的超参数)中用一部分GPU执行优化的MPI实现。和ES一样,我们能够使用Ray对PPO进行并行化,并且对串行程序的结构进行最小限度的更改。

6.3.3 控制模拟机器

我们展示Ray可以通过实时控制模拟机器来满足软实时要求。Ray驱动程序运行模拟的机器,并以固定的时间步长从1毫秒变为30毫秒,以模拟不同的实时需求。驱动程序提交的任务将使用线下训练的策略来计算要采取的操作。但是,只有在相关时间段内driver收到这些动作(否则先前的动作重复),才会采取行动。真实机器的延迟预算约为10毫秒,我们发现,即使我们运行模拟的速度比实时更快(使用3毫秒的时间步长),Ray也能够产生稳定的行为。表3显示了没有足够快到达机器的任务的比例。
这里写图片描述

7 相关工作

动态任务图。 Ray与CIEL密切相关。它们都支持具有嵌套任务的动态任务图,实现future抽象,并提供基于线性的容错。但是,它们在两个重要方面有所不同。首先,Ray通过actor抽象扩展任务模型。其次,Ray采用完全分布式的控制平面和调度器,而不是依靠单个主控。此外,Ray还添加了ray:wait()方法,采用内存中(而不是基于文件)对象存储,并扩展了现有的编程语言(Python),而CIEL提供了自己的脚本语言(Skywriting )。 Ray还与Dask密切相关,Dask支持动态任务图形,包括一个等待原语,并在Python环境中使用future抽象。但是,Dask使用集中调度程序,不提供类似actor的抽象,也不提供容错功能。

数据流系统。流行的数据流系统(如MapReduce,Spark和Dryad)广泛采用分析和ML工作负载,但其计算模型更具限制性。Spark和MapReduce实现了BSP执行模型,该模型假定同一阶段内的任务执行相同的计算并花费大致相同的时间。Dryad放宽了这一限制,但缺乏对动态任务图的支持。此外,这些系统都没有提供actor抽象,也没有实现分布式可扩展控制平面和调度器。最后,Naiad是一个数据流系统,可为某些工作负载提供改进的可扩展性,但仅支持静态任务图。

**Actor系统。**Orleans提供了一个虚拟的基于actor的抽象。Actor是永久的,它们的状态持续在调用之中。为了扩展,Orleans还允许actor的多个实例在actor以不可变状态操作或没有状态时并行运行。这些无状态的actor可以担任Ray的任务。但是,与Ray不同的是,Orleans开发者必须明确检查点角色状态和中间响应。此外,Orleans还提供了至少一次的语义。相比之下,Ray提供了透明容错和一次语义,因为每个方法调用都记录在GCS中,并且参数和结果都是不可变的。我们发现实际上这些限制不会影响我们应用程序的性能。

Erlang和C ++ Actor Framework(CAF)是另外两个基于actor的系统,它们还要求应用程序明确处理容错。而且,Erlang的全局状态存储不适合共享大型对象,如ML模型,而CAF不支持数据共享。

全局控制状态和调度。在软件定义网络(SDN),分布式文件系统(例如GFS),资源管理(例如Omega)和分布式框架(例如MapReduce,BOOM)中,先前已经提出了逻辑集中控制平面的概念,Ray从这些开拓性努力中汲取灵感,但提供了重大改进。与耦合控制平面数据和计算的SDN,BOOM和GFS相比,Ray将控制平面信息(例如,GCS)的存储与逻辑实现(例如调度器)分开解耦。这允许存储层和计算层独立扩展,这对于实现我们的可扩展性目标至关重要。Omega使用分布式体系结构,调度程序通过全局共享状态进行协调。对于这种体系结构,Ray添加了全局调度程序来平衡跨本地调度程序的负载,并针对ms级而非二级任务调度。

Ray实现了一个可水平扩展的独特的自下而上分布式调度程序,并且可以处理动态构建的任务图。与Ray不同,大多数现有的集群计算系统使用集中式调度程序体系结构。虽然Sparrow是分散式的,但它的调度程序会做出独立决定,限制可能的调度策略,并且所有任务都由同一个全局调度程序处理。Mesos实现了一个两级分层调度器,但是它的顶级调度器可能是一个瓶颈。Canary通过让每个调度程序实例处理任务图的一部分来实现令人印象深刻的性能,但不处理动态计算图。

机器学习框架。 TensorFlow和MXNet针对深度学习工作负载,并有效利用CPU和GPU。虽然它们对由线性代数运算的静态DAG组成的工作负载实现了出色的性能,但它们对更一般的工作负载的支持有限。TensorFlow Fold通过其内部C ++ API为动态任务图和MXNet提供了一些支持,但它们都不支持在执行期间修改DAG以响应任务进度、任务完成时间或故障的能力。原则上TensorFlow和MXNet通过允许程序员模拟低级消息传递和同步原语来实现通用性,但是这种情况下的陷阱和用户体验与MPI的类似。OpenMPI可以实现高性能,但编程相对困难,因为它需要显式协调来处理异构和动态任务图。此外,它迫使程序员明确处理容错。

8 讨论和经验

自从几个月前我们发布Ray以来,已有超过一百万的人下载并使用它。在这里,我们讨论了我们开发和使用Ray的经验,以及我们从早期用户那里收到的一些反馈。

API。在设计API时,我们强调极简主义。最初我们从基本任务抽象开始。后来,我们添加了wait()基元来适应具有不同持续时间的rollouts,并且actor抽象可以容纳第三方模拟器,并分摊昂贵的初始化开销。虽然由此产生的API相对初级,但它已被证明功能强大且易于使用。实际上,有些团队报告指示开发人员首先编写串行实现,然后使用Ray将其并行化。

为了说明这一点,接下来我们简要描述我们的两种其他算法的经验:异步优势Actor Critic(A3C)和超参数搜索.A3C是一种最先进的RL算法,利用异步策略更新来显著提高训练时间以前的算法。为了扩展这个算法,我们使用一个简单的分层方案,其中A3C的多个实例被并行训练并周期性地聚合以形成改进的模型。在Ray中实现分层A3C非常简单,需要20行Python代码才能扩展非分层版本。此外,这种简单的扩展将相同硬件的性能提高了30%。

我们能够使用Ray在大约30行Python代码中实现最先进的超参数搜索算法。Ray对嵌套任务的支持非常关键,因为多个实验必须并行运行,并且每个实验通常在内部使用并行。 wait()原型允许我们按照完成的顺序处理实验结果并自适应地启动新实验。Actor抽象允许我们暂停并恢复基于其他实验进展的有状态实验(参见第B.3节)。相反,大多数现有的实现必须等待所有的实验完成,这导致资源利用效率低下。

Ray的API工作仍然在进行中。基于早期的用户反馈,我们正在考虑增强API以包含更高级别的基元,例如简单聚合和映射。这些也可以通知Ray系统层的调度决策(4.2节)。

限制。鉴于工作负载普遍性,专业优化很难。例如,我们必须在不完全知道图计算的情况下做出调度决策。Ray中的计划优化可能需要更复杂的运行时概要分析。此外,为每个任务存储线性血缘,需要实施垃圾回收策略来减少GCS中的存储成本,这是我们正在积极开发的一项研究。

容错。我们经常被问到AI应用程序是否真的需要容错功能。毕竟,由于许多人工智能算法的统计性质,人们可以简单地忽略失败的展示。根据我们的经验,我们的答案是不合格的“是”。首先,忽略失败的能力使得应用程序更容易编写和推理。其次,我们通过确定性重新运行,能够大大简化调试过程,因为它使我们能够轻松地重现大多数错误。这一点尤其重要,因为由于它们的随机性,人工智能算法出人意料地难以调试。第三,容错功能有助于节省资金,因为它允许我们使用AWS上的现货实例等廉价资源运行。而且,随着工作负载的扩大,我们期望容错变得更加重要。当然,这是以一些开销为代价的。但是,我们发现这种开销对于我们的目标工作负载来说是最小的。

GCS和水平伸缩性。 GCS极大地简化了Ray开发和调试。所有其他组件的基本故障处理和水平缩放都需要不到一周时间才能实施。GCS使我们能够在调试时查询整个系统状态(而不必手动公开内部组件状态)。这帮助我们发现了许多错误并且了解系统行为。

GCS有助于Ray的水平可伸缩性。在6.1节中报告的实验中,只要GCS成为瓶颈,我们就可以通过添加更多的分片来扩展结果。通过简单地添加更多副本,GCS还可以使全局调度程序进行扩展。虽然目前我们正在手动配置GCS分片和全局调度器的数量,但我们计划在未来开发自适应算法。由于这些优势,我们认为集中控制状态将成为未来分布式系统的关键设计组件。

9 结论

新兴的AI应用程序提出了挑战性的计算需求。为了满足这些需求,Ray引入了全局控制存储和自下而上的分布式调度程序。这个架构实现动态任务图执行,而且反过来支持任务并行和actor编程模型。这种编程灵活性对于RL工作负载尤为重要,因为RL工作负载在资源需求、持续时间和功能方面产生各种任务。我们的评估表明,每秒可执行1M任务的线性可伸缩性,透明的容错能力以及对若干当代RL工作负载的显着性能改进。 因此,Ray为开发未来的AI应用程序提供了灵活性、性能和易用性。

猜你喜欢

转载自blog.csdn.net/u011254180/article/details/79327639