Skip to content

Commit

Permalink
[Feature][Flink] Support custom Yarn user submission of applications (D…
Browse files Browse the repository at this point in the history
…ataLinkDC#3428)

Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Apr 26, 2024
1 parent 2ba1347 commit d30910f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,21 @@
import org.dinky.data.annotations.SupportDialect;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.result.SelectResult;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.cache.Cache;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public abstract class BaseTask {

private static final Cache<String, SelectResult> results = new TimedCache<>(TimeUnit.MINUTES.toMillis(10));
final TaskDTO task;

public abstract JobResult execute() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public class CustomerConfigureOptions {
.defaultValue("/opt/dinky/sql-exec/")
.withDescription(
"The dinky configuration directory. It is used to mount the ConfigMap to the Flink container.");

public static final ConfigOption<String> YARN_APPLICATION_USER = key("yarn.application.user")
.stringType()
.noDefaultValue()
.withDescription("A custom user for your YARN application.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;

public abstract class YarnGateway extends AbstractGateway {
Expand Down Expand Up @@ -174,7 +175,28 @@ private void initYarnClient() {

yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();

synchronized (YarnGateway.class) {
String hadoopUserName;
try {
hadoopUserName = UserGroupInformation.getLoginUser().getUserName();
} catch (Exception e) {
hadoopUserName = "hdfs";
}

// 设置 yarn 提交的用户名
String yarnUser = configuration.get(CustomerConfigureOptions.YARN_APPLICATION_USER);
if (StrUtil.isNotBlank(yarnUser)) {
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(yarnUser));
}
try {
yarnClient.start();
} finally {
if (StrUtil.isNotBlank(yarnUser)) {
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(hadoopUserName));
}
}
}
}

private Path getYanConfigFilePath(String path) {
Expand Down Expand Up @@ -478,7 +500,9 @@ public String getLatestJobManageHost(String appId, String oldJobManagerHost) {
return null;
}

/** Creates a ZooKeeper path of the form "/a/b/.../z". */
/**
* Creates a ZooKeeper path of the form "/a/b/.../z".
*/
private static String generateZookeeperPath(String... paths) {
final String result = Arrays.stream(paths)
.map(YarnGateway::trimSlashes)
Expand Down

0 comments on commit d30910f

Please sign in to comment.