跳到主要内容

并发 DAG

并发执行 DAG

在 MR3 中,处于会话模式的 DAGAppMaster 不仅可以按顺序执行多个 DAG,还可以并发执行。以下是两个示例:

  • 当用户向 MR3SessionClient 提交一系列 DAG 时,DAGAppMaster 可以从传入流中一次执行一个 DAG。如果多个这样的用户连接到同一个 MR3SessionClient,DAGAppMaster 会执行相应数量的并发 DAG。
  • 当用户一次向 MR3SessionClient 提交许多独立的 DAG 时,DAGAppMaster 可以尝试启动尽可能多的 DAG,以尽快完成整个作业。

在这两种情况下,DAGAppMaster 都应该管理多个并发 DAG,从而部分扮演原始 Hadoop MapReduce 中 JobTracker 的角色。以下示例显示了一个 DAGAppMaster 执行来自四个并发流的 DAG:

concurrentdag

最大化集群利用率/吞吐量

结合 ContainerGroup 的使用,MR3 的这一特性帮助我们将启动 Yarn 容器的开销和 ContainerWorker 的空闲时间降至最低,从而最大化整体集群利用率和吞吐量,特别是在并发用户环境中。在有足够数量的活动 DAG 的情况下,来自同一 ContainerGroup 的 Vertex(无论来自同一 DAG 还是不同 DAG)可以共享 ContainerWorker,因此 ContainerWorker 即使在很短的时间内也不太可能保持空闲。对于为许多用户服务的长时间运行的 DAGAppMaster,启动 Yarn 容器的开销实际上消失了。

在以下实验中,我们运行 16 个客户端向 DAGAppMaster 提交相同的 DAG(它们是 TPC-DS 基准测试中查询 18 的实例)。每个客户端按顺序提交 10 个 DAG,因此 DAGAppMaster 总共接收 160 个相同的 DAG。在第一次运行中,没有 DAG 共享相同的 ContainerGroup,因此没有 ContainerWorker 在 DAG 之间共享,导致在 DAGAppMaster 的生命周期内分配了 4137 个 ContainerWorker。每个 DAG 创建大约 4137 / 160 = 26 个 ContainerWorker。

concurrentdagexp1

在第二次运行中,所有 DAG 共享相同的 ContainerGroup,因此所有 ContainerWorker 都被共享,导致在 DAGAppMaster 的生命周期内只分配了 196 个 ContainerWorker。总执行时间也从 4056 秒减少到 3282 秒。执行时间的减少归因于省去了创建 ContainerWorker 的成本。

concurrentdagexp2

DAGAppMaster 中并发 DAG 的最大数量可以通过 mr3-site.xml 中的键 mr3.am.max.num.concurrent.dags 来指定。