diff --git a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java index 896d978997..5b34a446d4 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java @@ -23,18 +23,14 @@ import org.dinky.data.annotations.SupportDialect; import org.dinky.data.dto.TaskDTO; import org.dinky.data.exception.NotSupportExplainExcepition; -import org.dinky.data.result.SelectResult; import org.dinky.data.result.SqlExplainResult; import org.dinky.job.JobResult; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.databind.node.ObjectNode; -import cn.hutool.cache.Cache; -import cn.hutool.cache.impl.TimedCache; import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.ClassUtil; import cn.hutool.core.util.ReflectUtil; @@ -42,8 +38,6 @@ @AllArgsConstructor public abstract class BaseTask { - - private static final Cache results = new TimedCache<>(TimeUnit.MINUTES.toMillis(10)); final TaskDTO task; public abstract JobResult execute() throws Exception; diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java index 72b932dd49..7cc151a646 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java @@ -56,4 +56,9 @@ public class CustomerConfigureOptions { .defaultValue("/opt/dinky/sql-exec/") .withDescription( "The dinky configuration directory. It is used to mount the ConfigMap to the Flink container."); + + public static final ConfigOption YARN_APPLICATION_USER = key("yarn.application.user") + .stringType() + .noDefaultValue() + .withDescription("A custom user for your YARN application."); } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index 05c51d9d22..52b9d9aa40 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -92,6 +92,7 @@ import cn.hutool.core.io.FileUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.ReUtil; +import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; public abstract class YarnGateway extends AbstractGateway { @@ -174,7 +175,28 @@ private void initYarnClient() { yarnClient = YarnClient.createYarnClient(); yarnClient.init(yarnConfiguration); - yarnClient.start(); + + synchronized (YarnGateway.class) { + String hadoopUserName; + try { + hadoopUserName = UserGroupInformation.getLoginUser().getUserName(); + } catch (Exception e) { + hadoopUserName = "hdfs"; + } + + // 设置 yarn 提交的用户名 + String yarnUser = configuration.get(CustomerConfigureOptions.YARN_APPLICATION_USER); + if (StrUtil.isNotBlank(yarnUser)) { + UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(yarnUser)); + } + try { + yarnClient.start(); + } finally { + if (StrUtil.isNotBlank(yarnUser)) { + UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(hadoopUserName)); + } + } + } } private Path getYanConfigFilePath(String path) { @@ -478,7 +500,9 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) { return null; } - /** Creates a ZooKeeper path of the form "/a/b/.../z". */ + /** + * Creates a ZooKeeper path of the form "/a/b/.../z". + */ private static String generateZookeeperPath(String... paths) { final String result = Arrays.stream(paths) .map(YarnGateway::trimSlashes)