跳到主要内容

Worker 调度

在多个客户端之间共享和回收 ContainerWorker

MR3 允许多个客户端通过创建公共 ContainerGroup 来共享 ContainerWorker。属于同一 ContainerGroup 的所有 MR3 ContainerWorker 具有共同的特性,可以执行由拥有该 ContainerGroup 的任何客户端提交的任何 DAG 发起的任何 TaskAttempt。由于客户端可能以不规则间隔提交复杂程度不同的 DAG,因此使用公共 ContainerGroup 可以比为每个单独的客户端创建不同的 ContainerGroup 实现更高的资源利用率。需要注意的是,一个 ContainerGroup 可以由多个客户端共享。这是因为 MR3 会合并具有相同属性集的同名 ContainerGroup。

为了进一步提高资源利用率,MR3 允许 ContainerWorker 在不同的 ContainerGroup 之间回收。如果两个 ContainerGroup 提供相同的运行时环境(就计算资源而言),则它们是兼容的。因此,我们可以安全地在兼容的 ContainerGroup 之间迁移 ContainerWorker,只需重新初始化即可。需要注意的是,资源管理器(如 Yarn 或 Kubernetes)并不感知(也不关心)MR3 ContainerWorker 所有权的变化。MR3 在内部维护 ContainerKind 以跟踪兼容 ContainerGroup 的集合。对于每个新的 ContainerGroup,MR3 会检查兼容性后将其添加到现有的 ContainerKind 或创建新的 ContainerKind。

回收 ContainerWorker 的机制对于 Spark on MR3 特别有用。事实上,开发 Spark on MR3 的主要动机是能够在线程池中回收计算资源。

MR3 的 WorkerScheduler

MR3 的 WorkerScheduler 负责管理同一 ContainerKind 中 ContainerGroup 之间 ContainerWorker 的迁移。它提供两种策略:FIFO 调度和公平调度,它们指定何时停止 ContainerWorker 以及将 ContainerWorker 迁移到哪里。迁移到另一个 ContainerGroup 后,已停止的 ContainerWorker 可以重新初始化自身并恢复 TaskAttempt 的执行。

  • 在 FIFO 调度下,ContainerWorker 仅在自愿停止时停止,即当它没有更多要执行的 TaskAttempt 且没有更多中间数据要传输时。然后 WorkerScheduler 将已停止的 ContainerWorker 迁移到最老的需要新 ContainerWorker 的 ContainerGroup。因此,我们可以将 FIFO 调度视为为最老的 ContainerGroup 分配最高优先级。

  • 在公平调度下,WorkerScheduler 尝试为所有 ContainerGroup 维护相同数量的 ContainerWorker。它会定期更新每个 ContainerGroup 所属的 ContainerWorker 数量。然后,如果 ContainerGroup 拥有的 ContainerWorker 多于平均值,则请求该 ContainerGroup 停止一些 ContainerWorker,而 ContainerWorker 少于平均值的 ContainerGroup 被允许获取已停止的 ContainerWorker。由于 ContainerWorker 可能保留要传输的中间数据,因此不会立即停止请求。相反,它会通过等待所有运行的 DAG 完成来优雅地停止。

默认情况下,MR3 不回收 ContainerWorker。为了使用 FIFO 调度和公平调度,请将配置键 mr3.container.scheduler.scheme 分别设置为 fifofair。在公平调度下,配置键 mr3.check.memory.usage.event.interval.secs 指定更新每个 ContainerGroup 所属 ContainerWorker 数量的时间间隔(以秒为单位)。