跳到主要内容

背压

Shuffle handler 上的背压

在 MR3 中,当 shuffle handler 无法跟上来自远程节点的传入 fetch 请求时,就会发生背压。为了更有效地处理同时的 fetch 请求,ContainerWorker 根据配置键 mr3.use.daemon.shufflehandler 将 shuffle 负载分配到多个 shuffle handler。每个 shuffle handler 又创建多个线程,由 tez-site.xml 中的配置键 tez.shuffle.max.threads 控制。然而,过多的 fetch 请求仍然可能导致 shuffle handler 响应缓慢,甚至在个别 shuffle handler 上长时间停滞。最坏的情况下,执行时间较长的 Task 可能会成为慢任务,可能触发推测执行。

对于背压,shuffle handler(从远程节点上的 fetcher 提供 shuffle 请求)和 ShuffleServer(收集并转发 shuffle 请求)都参与其中。

MR3 中的背压和推测获取

MR3 实现了一种通过在 ShuffleServer 端控制 fetcher 的创建来对 shuffle handler 进行背压的机制,而不是在 shuffle handler 端限制传入 shuffle 请求的速率。这种方法是必要的,因为 shuffle handler 的状态(例如某些 shuffle handler 是否停滞)不会传播到 ShuffleServer。Shuffle handler 还通知 ShuffleServer 正在接收超过阈值的 shuffle 请求的 ContainerWorker 列表。

  • 当 ContainerWorker 中活动的 shuffle 请求(即向 fetcher 传输数据的连接)总数超过阈值时,所有 ShuffleServer 停止创建向该 ContainerWorker 发送 shuffle 请求的新 fetcher。当活动的 shuffle 请求总数降至另一个阈值以下时(设置为前一个阈值的三分之二以防止抖动),ShuffleServer 恢复创建 fetcher。

  • 当联系远程 shuffle handler 的 fetcher 在一段时间内没有取得进展时,其相应的 ShuffleServer 暂时阻止与该 shuffle handler 的进一步连接。需要注意的是,ShuffleServer 仍然可以创建联系同一 ContainerWorker 内其他 shuffle handler 的 fetcher。

作为背压处理逻辑的一部分,MR3 还实现了一种称为推测获取的机制。当现有 fetcher 在一段时间内未能取得进展时,会启动推测 fetcher。推测获取解决了一种情况:即使为请求服务的 shuffle handler 已恢复,fetcher 仍然卡住。通过触发推测获取,MR3 通常可以避免更昂贵的回退机制——推测执行,这需要启动新的 TaskAttempt。

MR3 使用以下配置键(均在 tez-site.xml 中)来控制背压和推测获取。

  • 背压

    • tez.shuffle.max.block.requests.thread.multiple 指定触发背压并阻止与 ContainerWorker 的进一步连接的活动的 shuffle 请求(或 Netty 通道)总数阈值。具体来说,阈值计算为:tez.shuffle.max.block.requests.thread.multiple * shuffle handler 总数 = tez.shuffle.max.block.requests.thread.multiple * mr3.use.daemon.shufflehandler * tez.shuffle.max.threads
    • tez.runtime.shuffle.stuck.fetcher.threshold.millis 指定 fetcher 在触发背压并阻止与 shuffle handler 的进一步连接之前的经过时间阈值。默认值为 2500。
    • tez.runtime.shuffle.stuck.fetcher.release.millis 指定取消背压并恢复创建与之前阻止的 shuffle handler 联系的 fetcher 的经过时间阈值。默认值为 10000。
  • 推测获取

    • tez.runtime.shuffle.speculative.fetch.wait.millis 指定 fetcher 在触发推测获取之前的经过时间阈值。默认值为 12500。
    • tez.runtime.shuffle.max.speculative.fetch.attempts 指定每次获取尝试的最大推测 fetcher 数量。默认值为 2,因此每次获取尝试最多可以创建 3 个 fetcher。

MR3 认为 fetcher 在没有收到中间数据的实际有效载荷时没有取得进展,无论连接是否已建立和验证。

下图显示了使用背压和推测获取的默认值时 fetcher 的状态转换。Fetcher 在其执行完成之前可以处于以下五种状态之一:

  • Normal:fetcher 正常运行。
  • Stuck:fetcher 在 tez.runtime.shuffle.stuck.fetcher.threshold.millis 指定的时间阈值内没有取得进展。
  • Recovered:fetcher 不再参与背压,因为它已收到中间数据。
  • Speculative:fetcher 不再参与背压,因为经过时间已超过 tez.runtime.shuffle.stuck.fetcher.release.millis 指定的阈值。
  • Retry:fetcher 在 tez.runtime.shuffle.speculative.fetch.wait.millis 指定的时间阈值后仍未完成。

fetcher.state

在状态 SpeculativeRetry 下,MR3 可能会创建推测 fetcher(在配置键 tez.runtime.shuffle.max.speculative.fetch.attempts 指定的限制内)。