配置 Tez 运行时
Tez 运行时的行为由类路径中的配置文件 tez-site.xml 指定。MR3 从原始 Tez 继承了许多 Tez 运行时配置键。例如,tez.runtime.io.sort.mb 指定排序输出所需的内存量,tez.runtime.shuffle.merge.percent 指定每次合并的 shuffle 数据比例。
MR3 还引入了特定于 MR3 新功能的其他配置键,并且可能以不同方式解释现有配置键。
以下是 Tez 运行时的重要配置键以及 MR3 中引入的新配置键的说明。
VertexManager
| Name | Default value | Description |
|---|---|---|
| tez.shuffle-vertex-manager.enable.auto-parallel | false | true:为 ShuffleVertexManager 启用自动并行。false:禁用自动并行。更多详情请参阅 自动并行。 |
| tez.shuffle-vertex-manager.auto-parallel.min.num.tasks | 20 | 触发自动并行的最小 Task 数。例如,如果值设置为 20,则仅考虑具有至少 20 个 Task 的 Vertex。用户可以通过将此配置键设置为较大值来有效地禁用自动并行。 |
| tez.shuffle-vertex-manager.auto-parallel.max.reduction.percentage | 10 | 应用自动并行后可保留的 Task 百分比。例如,如果值设置为 10,则 Task 数最多可减少 100 - 10 = 90%,从而仅保留 10% 的 Task。 |
| tez.shuffle-vertex-manager.use-stats-auto-parallelism | false | true:应用自动并行时分析输入统计信息。false:不使用输入统计信息。 |
| tez.shuffle.vertex.manager.auto.parallelism.min.percent | 20 | 规范化输入统计信息时的下限。例如,如果值设置为 20,则输入统计信息在 20 到 100 之间规范化。即,大小为零的输入被规范化为 20,而最大输入大小被映射到 100。 |
Runtime
| Name | Default value | Description |
|---|---|---|
| tez.runtime.use.free.memory.fetched.input | false | true:如果可用内存大小超过分配给单个 Task 的内存大小,则使用 MemoryFetchedInput(用于无序数据)和 InMemoryMapOutput(用于有序数据),而不是溢出到本地磁盘。false:获取器不考虑可用内存大小。 |
| tez.runtime.free.memory.factor.for.fetched.input | 2 | 用于计算每个 LogicalInput 存储 shuffle 输入数据的可用内存总量的倍数。 |
| tez.runtime.shuffle.unordered.memory.streaming | false | true:获取器不将无序数据写入本地磁盘。false:获取器可能将无序数据写入本地磁盘。 |
| tez.runtime.use.free.memory.writer.output | false | true:如果可用内存大小超过 tez.runtime.free.memory.writer.output.threshold.mb 指定的内存阈值,则 Task 将输出存储在内存中而不是写入本地磁盘。false:Task 将输出写入本地磁盘。如果设置为 true,则在 hive-site.xml 中将 hive.mr3.delete.vertex.local.directory 设置为 true。仅在流水线 shuffle 时有效。 |
| tez.runtime.free.memory.writer.output.threshold.mb | 6144 | 当 tez.runtime.use.free.memory.writer.output 设置为 true 时,用于在内存中写入输出的可用内存阈值(MB)。 |
| tez.runtime.pipelined-shuffle.enabled | false | true:使用流水线 shuffle。false:不使用流水线 shuffle。如果设置为 true/false,则另一个配置键 tez.runtime.enable.final-merge.in.output 会自动设置为 false/true。使用带有流水线 shuffle 的推测执行不推荐。 |
| tez.runtime.pipelined.sorter.use.soft.reference | false | true:对在同一 ContainerWorker 中运行的 TaskAttempt 分配的 ByteBuffer 使用软引用。这些软引用被重用。false:不使用软引用。 |
| tez.runtime.transfer.data-via-events.enabled | true | true:直接将无序数据嵌入消息中(类型为 DataMovementEvent)。false:不嵌入。仅对具有单个输出分区的 Vertex 有效。 |
| tez.runtime.transfer.data-via-events.max-size | 2048 | 可直接嵌入消息的无序数据的最大大小,从而避免创建获取器。 |
Shuffle handlers
| Name | Default value | Description |
|---|---|---|
| tez.shuffle.connection-keep-alive.enable | false | true:保持连接以便重用。false:不重用。 |
| tez.shuffle.max.threads | 0 | 每个 shuffle 处理程序的线程数。默认值为 0,将线程数设置为核心数的 2 倍。 |
| tez.shuffle.listen.queue.size | 128 | 监听队列的大小。可以设置为 /proc/sys/net/core/somaxconn 中的值。 |
| tez.shuffle.port | 13563 | shuffle 处理程序的端口号。如果 ContainerWorker 无法为 shuffle 处理程序获取端口号,它会选择一个随机端口号。 |
| tez.shuffle.mapoutput-info.meta.cache.size | 1000 | mapper 输出元数据的缓存大小。 |
ShuffleServer and fetchers
| Name | Default value | Description |
|---|---|---|
| tez.am.shuffle.auxiliary-service.id | mapreduce_shuffle | 外部 shuffle 服务的服务 ID。设置为 tez_shuffle 以使用 MR3 shuffle 处理程序。在 Kubernetes 和独立模式下必须设置为 tez_shuffle。 |
| tez.shuffle.skip.verify.request | false | true:MR3 shuffle 处理程序跳过检查 shuffle 请求的有效性。false:MR3 shuffle 处理程序检查 shuffle 请求的有效性。仅对 MR3 shuffle 处理程序有效(使用 tez_shuffle)。 |
| tez.runtime.shuffle.keep-alive.enabled | false | true:在获取器中保持连接以便重用。false:不重用。 |
| tez.runtime.shuffle.connect.timeout | 27500 | 尝试连接到 shuffle 服务或内置 shuffle 处理程序以报告 fetch 失败之前的最大时间(毫秒)。更多详情请参阅 容错。 |
| tez.runtime.shuffle.parallel.copies | 20 | 每个 LogicalInput 的最大获取器数。请注意,单个 RuntimeTask 可以创建多个 LogicalInput。 |
| tez.runtime.shuffle.total.parallel.copies | 40 | 每个 ContainerWorker 的最大获取器数。 |
| tez.runtime.shuffle.fetch.max.task.output.at.once | 20 | 每个获取请求要获取的最大任务输出文件数。较大的值可能导致 HTTP 400 错误。 |
| tez.runtime.shuffle.ranges.scheme | first | first:ShuffleServer 随机选择要 shuffle 的 LogicalInput。max(实验性):ShuffleServer 选择具有最多待处理输入的 LogicalInput。 |
| tez.runtime.optimize.local.fetch | true | true:直接从本地磁盘读取存储的无序数据。false:通过获取器读取存储在本地磁盘的无序数据。使用内存到内存 shuffle 时自动设置为 false。 |
| tez.runtime.optimize.local.fetch.ordered | true | true:直接从本地磁盘读取存储的有序数据。false:通过获取器读取存储在本地磁盘的有序数据。使用内存到内存 shuffle 时自动设置为 false。 |
Backpressure and speculative fetching
| Name | Default value | Description |
|---|---|---|
| tez.shuffle.max.block.requests.thread.multiple | 2 | 用于计算在触发背压之前每个 ContainerWorker 允许的最大活动 shuffle 请求(或 Netty 通道)数的倍数。 |
| tez.runtime.shuffle.speculative.fetch.wait.millis | 30000 | 获取器触发推测获取之前的经过时间阈值(毫秒)。 |
| tez.runtime.shuffle.stuck.fetcher.threshold.millis | 3000 | 获取器在触发背压并阻止进一步连接到 shuffle 处理程序之前的经过时间阈值(毫秒)。 |
| tez.runtime.shuffle.stuck.fetcher.release.millis | 15000 | 解除背压后恢复联系之前被阻止的获取器的经过时间阈值(毫秒)。 |
| tez.runtime.shuffle.max.speculative.fetch.attempts | 2 | 每个获取尝试的最大推测获取器数。 |
Celeborn
以下配置键在 tez.celeborn.enabled 设置为 true 且 MR3 使用 Celeborn 作为远程 shuffle 服务时有效。形式为 tez.celeborn.XXX.YYY 的配置键会自动转换为 celeborn.XXX.YYY 并传递给 Celeborn 客户端。
| Name | Default value | Description |
|---|---|---|
| tez.celeborn.XXX.YYY | - | 转换为 celeborn.XXX.YYY 以供 Celeborn 读取。 |
| tez.runtime.celeborn.fetch.split.threshold | 1073741824 | 获取器可以从 Celeborn workers 接收的最大数据大小(字节)。默认值为 1GB。 |
| tez.runtime.celeborn.unordered.fetch.spill.enabled | true | true:Reducer 先将 mapper 的输出写入本地磁盘,然后再处理。false:Reducer 直接处理通过无序边获取的 mapper 输出,而无需写入本地磁盘。 |
| tez.runtime.celeborn.client.fetch.throwsFetchFailure | true | true:每当发生获取失败时抛出异常,从而触发 Task/Vertex 重新运行。false:不抛出异常。 |