推测执行
分布式计算中的慢任务
MR3 实现了一种推测执行机制,以减轻慢任务的影响,即运行时间比同一 Vertex 发起的兄弟 Task 长得多的 Task。慢任务问题在分布式计算中很普遍,由于其本质,不太可能出现完美的解决方案。然而,成熟的分布式系统应该以某种方式解决此问题,因为慢任务会产生有害影响:
- 即使只有一个慢任务也可能大大增加 DAG 的执行时间,从而破坏用户体验。
- 当慢任务运行时,整个系统可能几乎停滞,从而在很长时间内浪费硬件资源。
因此,MR3 用一种推测执行机制来解决慢任务问题。
触发推测执行
MR3 在检测到运行时间(未完成)远长于其兄弟 Task 的 Task 时触发推测执行。具体来说,它开始监视 TaskAttempt 以进行推测执行,并按如下方式决定创建新的 TaskAttempt:
- 对于每个 Vertex,MR3 计算已完成的 Task 数量并测量其执行时间。
- 当已完成的 Task 数量达到配置键
mr3.am.task.concurrent.run.threshold.percent指定的百分比时,MR3 计算它们的平均执行时间并开始监视剩余 TaskAttempt 以进行推测执行。例如,如果mr3.am.task.concurrent.run.threshold.percent设置为 95.0 且 Vertex 有 100 个 Task,MR3 记录前 95 个已完成 Task 的平均执行时间,并开始监视剩余 5 个 Task 的 TaskAttempt。 - 为了避免过早创建用于推测执行的 TaskAttempt,平均执行时间会进行调整,使其不小于配置键
mr3.am.task.concurrent.run.min.threshold.ms(以毫秒为单位)指定的时间。例如,当mr3.am.task.concurrent.run.min.threshold.ms设置为 10000 时,如果平均执行时间小于 10 秒,则调整为 10 秒。 - 如果 Task 数量或配置键
mr3.am.task.concurrent.run.threshold.percent指定的百分比太小,MR3 可能会监视所有 TaskAttempt 以进行推测执行。例如,如果 Vertex 只有一个 Task 且mr3.am.task.concurrent.run.threshold.percent设置为 95.0,MR3 会监视该 Vertex 的唯一 Task 以进行推测执行(因为 1 * 95 / 100 四舍五入为零)。在这种情况下,平均执行时间由配置键mr3.am.task.concurrent.run.min.threshold.ms设置,因为在开始推测执行之前没有 Task 运行。 - 如果 TaskAttempt 的运行时间长于从以下步骤得出的阈值:1)上一步获得的平均执行时间和 2)配置键
mr3.am.task.concurrent.run.multiplier指定的乘数,MR3 会为同一个 Task 创建一个新的 TaskAttempt,而不杀死现有的 TaskAttempt。例如,如果平均执行时间为 10 秒且mr3.am.task.concurrent.run.multiplier设置为 2.0,则阈值为 10 * 2.0 = 20 秒。现在我们看到同一个 Task 有两个并发的 TaskAttempt 在运行。 - 如果新的 TaskAttempt 也运行时间长于阈值,MR3 会为同一个 Task 创建第三个 TaskAttempt。这样,MR3 可以按固定间隔为同一个 Task 创建一系列 TaskAttempt。
- 同一个 Task 的 TaskAttempt 数量受配置键
mr3.am.task.max.failed.attempts的限制。例如,当mr3.am.task.max.failed.attempts设置为 5 时,一个 Task 最多可以创建 5 个独立的 TaskAttempt。 - Task 在其任何 TaskAttempt 完成时立即完成。所有其他 TaskAttempt 立即被杀死。
MR3 以配置键 mr3.task.am.heartbeat.duration.interval.ms 指定的粒度更新每个 TaskAttempt 的持续时间。具体来说,它在从 ContainerWorker 收到心跳时更新 TaskAttempt 的持续时间。心跳的使用是必要的,因为心跳是 TaskAttempt 实际在 ContainerWorker 中运行的唯一证据。(如果 TaskAttempt 长时间没有响应心跳,它会自动超时杀死。)因此,用户不应将 mr3.task.am.heartbeat.duration.interval.ms 设置为过大的值。
在创建新的 TaskAttempt 时,MR3 可能会抢占后代 Vertex 的现有 TaskAttempt 以避免死锁。用户可以通过将配置键 mr3.am.task.concurrent.run.threshold.percent 设置为 100(这是默认值)来禁用推测执行。
示例
作为示例,以下两个图表显示了一个包含 10 个 Task 的 Vertex 的进度。我们假设前 10 个 TaskAttempt 同时启动,并使用以下设置:
mr3.am.task.concurrent.run.threshold.percent设置为 90.0。mr3.am.task.concurrent.run.min.threshold.ms设置为 10000(相当于 10 秒)。mr3.am.task.concurrent.run.multiplier设置为 2.0。mr3.am.task.max.failed.attempts设置为 3。
在第一种情况下,我们使用配置键 mr3.am.task.concurrent.run.min.threshold.ms。

- 前 9 个 Task 在不到 10 秒的时间内成功。因此,它们的平均执行时间调整为 10 秒。
- 推测执行的阈值设置为 10 * 2.0 = 20 秒。
- Task 10 的第一个 TaskAttempt 在 20 秒内未完成,因此 MR3 判断它是一个慢任务,并在开始后 20 秒创建 Task 10 的第二个 TaskAttempt。
- Task 10 的第二个 TaskAttempt 在 20 秒内也未完成,因此 MR3 在开始后 40 秒创建 Task 10 的第三个 TaskAttempt。需要注意的是,Task 10 的 TaskAttempt 总数不超过配置键
mr3.am.task.max.failed.attempts指定的限制。 - Task 10 的第三个 TaskAttempt 在 10 秒内完成(在开始后 50 秒),MR3 杀死第一个和第二个 TaskAttempt。
在第二种情况下,我们不使用配置键 mr3.am.task.concurrent.run.min.threshold.ms。

- 前 9 个 Task 平均在 100 秒内成功,大于 10 秒。
- 推测执行的阈值设置为 100 * 2.0 = 200 秒。
- Task 10 的第一个 TaskAttempt 在 200 秒内未完成,因此 MR3 判断它是一个慢任务,并在开始后 200 秒创建 Task 10 的第二个 TaskAttempt。
- Task 10 的第二个 TaskAttempt 在 200 秒内也未完成,因此 MR3 在开始后 400 秒创建 Task 10 的第三个 TaskAttempt。
- Task 10 的第三个 TaskAttempt 在 100 秒内完成(在开始后 500 秒),MR3 杀死第一个和第二个 TaskAttempt。
结合在单个 ContainerWorker 中使用多个 shuffle handler,推测执行使 MR3 能够消除 Fetch 延迟,这是慢任务最常见的来源。更多详情请参阅消除 Fetch 延迟。