MasterControl
MasterControl 是一个实用工具,允许用户连接到 DAGAppMaster 并管理 DAG。MasterControl 在 DAGAppMaster 以 Yarn 或 Kubernetes 模式运行时工作。
在 Hadoop 上使用 MasterControl
在 Hadoop 上,MasterControl 应由 DAGAppMaster 的所有者执行。执行的脚本是mr3/master-control.sh。它接受一个带有 Yarn ApplicationID 的命令。
mr3/master-control.sh ...
Usage:
getDags <AppID> : 获取所有运行中/已完成的 DAG
getRunningDags <AppID> : 获取所有运行中的 DAG
killDag <AppId> <DAGID> : 发送杀死 DAG 的请求
ackDagFinished <AppID> <DAGID> : 确认 DAG 完成
stopContainerWorkers <AppID> : 优雅地停止所有运行中的 ContainerWorkers
closeDagAppMaster <AppID> : 优雅地停止 DAGAppMaster
命令stopContainerWorkers等待所有当前 DAG 完成,然后终止所有运行中的 ContainerWorkers。命令closeDagAppMaster等待所有当前 DAG 完成,然后终止当前的 DAGAppMaster。对于其他命令,我们在下面展示示例。
我们列出 Yarn 应用程序application_1742983838780_0079中所有运行中的 DAG。MasterControl 打印所有运行中 DAG 的 ID 和名称。
mr3/master-control.sh getDags application_1742983838780_0079
...
Lists of running/finished DAGs in application_1742983838780_0079:
dag_1742983838780_0079_1521 hive_20250406140922_d51821dd-9187-4106-ba11-9b0dad91b32d:1521
dag_1742983838780_0079_1538 hive_20250406141409_2c9d9b24-ac71-4ff2-9d4b-3c0ff2e787f6:1538
master-control.sh可能会因IllegalAccessError失败:
java.lang.IllegalAccessError: class com.datamonad.mr3.client.DAGClientHandlerProtocolRPC$GetAllDagsRequestProto tried to access private field com.google.protobuf.AbstractMessage.memoizedSize (com.datamonad.mr3.client.DAGClientHandlerProtocolRPC$GetAllDagsRequestProto and com.google.protobuf.AbstractMessage are in unnamed module of loader 'app')
此错误发生是因为 HivePlus 使用 Protobuf 3,而类路径中包含了 Protobuf 2 jar 文件。为了避免此错误,请移除类路径中的 Protobuf 2 jar 文件。
我们杀死运行中的 DAGdag_1742983838780_0079_1521。
mr3/master-control.sh killDag application_1742983838780_0079 dag_1742983838780_0079_1521
...
Sent a request to kill DAG dag_1742983838780_0079_1521.
过了一会儿,DAGdag_1742983838780_0079_1521被杀死,不再出现在运行中 DAG 的列表中。
在 Kubernetes 上使用 MasterControl
在 Kubernetes 上,我们应在 HiveServer2 Pod 内执行 MasterControl(那里已经挂载了env.sh并设置了环境变量CLIENT_TO_AM_TOKEN_KEY)。第一步是从 HiveServer2 的日志文件中获取 ApplicationID(在 Kubernetes 外部)。在以下示例中,我们获取 HiveServer2 Pod 的名称,将其作为参数用于kubectl logs,并获取 ApplicationIDapplication_2407_0000。
kubectl get pods -n hivemr3 | grep hiveserver2
hiveserver2-595f4c56c4-rxbgx 1/1 Running 0 4m39s
kubectl logs -n hivemr3 hiveserver2-595f4c56c4-rxbgx | grep ApplicationID
2025-04-06T08:03:31,266 INFO [main] client.MR3Client$: Starting DAGAppMaster with ApplicationID application_9348_0000 in session mode
接下来检查 HiveServer2 Pod 内是否已设置环境变量CLIENT_TO_AM_TOKEN_KEY。
kubectl exec -it -n hivemr3 hiveserver2-595f4c56c4-rxbgx -- /bin/bash -c 'export PS1="$ "; exec /bin/bash'
printenv | grep CLIENT_TO_AM_TOKEN_KEY
CLIENT_TO_AM_TOKEN_KEY=b93f7b72-bad3-4259-9040-fe57fb339050
现在用户可以使用 ApplicationID 执行master-control.sh。
./master-control.sh
Usage:
getDags <AppID> : 获取所有运行中/已完成的 DAG
getRunningDags <AppID> : 获取所有运行中的 DAG
killDag <AppId> <DAGID> : 发送杀死 DAG 的请求
ackDagFinished <AppID> <DAGID> : 确认 DAG 完成
stopContainerWorkers <AppID> : 优雅地停止所有运行中的 ContainerWorkers
closeDagAppMaster <AppID> : 优雅地停止 DAGAppMaster
updateResourceLimit <AppID> <Max memory in GB> <Max CPU cores> : 更新资源限制
updateAutoScaling <AppID> <autoScaleOutThresholdPercent> <autoScaleInThresholdPercent> <autoScaleInMinHosts> <autoScaleOutNumInitialContainers> : 更新自动扩缩容参数。
./master-control.sh getDags application_9348_0000
...Lists of running/finished DAGs in application_9348_0000:
Lists of running/finished DAGs in application_9348_0000:
dag_9348_0000_5 tpcds-query12
./master-control.sh killDag application_9348_0000 dag_9348_0000_5
Sent a request to kill DAG dag_9348_0000_5.
在 Kubernetes 上,用户可以使用命令updateResourceLimit来更新(增加或减少)所有 ContainerWorker Pod 的总资源限制。此命令覆盖mr3-site.xml中配置键mr3.k8s.worker.total.max.memory.gb和mr3.k8s.worker.total.max.cpu.cores的设置。如果当前 ContainerWorker Pod 消耗的资源超过新限制,MR3 通过停止年轻的 ContainerWorker Pod 来返回多余的资源。为了不中断活动 DAG 的执行,MR3 会优雅地停止这些 ContainerWorker Pod,它们将继续正常运行,直到所有活动 DAG 完成。
./master-control.sh updateResourceLimit application_9348_0000 128 32
...Sent a request to update the resource limit for application_9348_0000: 128 32
在 Kubernetes 上,用户可以使用命令updateAutoScaling来更新自动扩缩容的配置。
autoScaleOutThresholdPercent指定ScaleOutThreshold =mr3.auto.scale.out.threshold.percent。autoScaleInThresholdPercent指定ScaleInThreshold =mr3.auto.scale.in.threshold.percent。autoScaleInMinHosts指定AutoScaleInMinHosts =mr3.auto.scale.in.min.hosts。autoScaleOutNumInitialContainers指定mr3.auto.scale.out.num.initial.containers。
重新运行 Beeline
运行 HivePlus 时,如果用户执行了命令closeDagAppMaster且当前 DAGAppMaster 正在优雅地停止自己,则 Beeline 连接会失败并显示以下错误,无法执行查询。
Caused by: org.apache.hadoop.ipc.RemoteException(com.datamonad.mr3.api.common.MR3Exception): DAGAppMaster.gracefulShutdown() already called and cannot take a new DAG
这是正常行为,因为 DAGAppMaster 在优雅地停止自己时会拒绝接受新的 DAG。由于相同的 Beeline 连接无法执行更多查询,因此只需重启 Beeline 即可解决问题。