Skip to content

Commit

Permalink
解决yarn webui开启Kerberos认证后递交Flink任务无法获取任务状态的问题,增加调用Flink HTTP API时候进行K…
Browse files Browse the repository at this point in the history
…erberos认证,相关issues: #3470
  • Loading branch information
ze.miao committed Dec 6, 2024
1 parent 8cbd5bb commit ad8733c
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.dinky.gateway.yarn;

import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import java.io.IOException;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;

public class RequestKerberosUrlUtils {
public static Logger logger = LoggerFactory.getLogger(RequestKerberosUrlUtils.class);
private String principal;
private String keyTabLocation;

public RequestKerberosUrlUtils(){}

public RequestKerberosUrlUtils(String principal, String keyTabLocation) {
this.principal = principal;
this.keyTabLocation = keyTabLocation;
}

public RequestKerberosUrlUtils(String principal, String keyTabLocation, boolean isDebug) {
this(principal, keyTabLocation);
if (isDebug) {
System.setProperty("sun.security.spnego.debug", "true");
System.setProperty("sun.security.krb5.debug", "true");
}
}

public RequestKerberosUrlUtils(String principal, String keyTabLocation, String krb5Location, boolean isDebug) {
this(principal, keyTabLocation, isDebug);
// System.setProperty("java.security.krb5.conf", krb5Location);
}

private static HttpClient buildSpengoHttpClient() {

Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder
.<AuthSchemeProvider>create()
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true))
.build();

BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() {
@Override
public Principal getUserPrincipal() {
return null;
}

@Override
public String getPassword() {
return null;
}
});

CloseableHttpClient httpClient = HttpClientBuilder
.create()
.setDefaultAuthSchemeRegistry(authSchemeRegistry)
.setDefaultCredentialsProvider(credentialsProvider)
.build();
return httpClient;
}

public HttpResponse callRestUrl(final String url, final String userId) {
logger.warn(String.format("Calling KerberosHttpClient %s %s %s", this.principal, this.keyTabLocation, url));
Configuration config = new Configuration() {
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
HashMap<String, Object> options = new HashMap<String, Object>(){
{
put("useTicketCache", "false");
put("useKeyTab", "true");
put("keyTab", keyTabLocation);
//Krb5 in GSS API needs to be refreshed so it does not throw the error
//Specified version of key is not available
put("refreshKrb5Config", "true");
put("principal", principal);
put("storeKey", "true");
put("doNotPrompt", "true");
put("isInitiator", "true");
put("debug", "true");
}
};
return new AppConfigurationEntry[] {
new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
options)
};
}
};
Set<Principal> princ = new HashSet<Principal>(1);
princ.add(new KerberosPrincipal(userId));
Subject sub = new Subject(false, princ, new HashSet<Object>(), new HashSet<Object>());
try {
//auth module:Krb5Login
LoginContext lc = new LoginContext("Krb5Login", sub, null, config);
lc.login();
Subject serviceSubject = lc.getSubject();
return Subject.doAs(serviceSubject, new PrivilegedAction<HttpResponse>() {
HttpResponse httpResponse = null;
@Override
public HttpResponse run() {
try {
HttpClient spnegoHttpClient = buildSpengoHttpClient();
httpResponse = spnegoHttpClient.execute(new HttpGet(url));
return httpResponse;
} catch (IOException ioe) {
ioe.printStackTrace();
}
return httpResponse;
}
});
} catch (Exception le) {
le.printStackTrace();
}
return null;
}
}
138 changes: 82 additions & 56 deletions dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,18 @@

package org.dinky.gateway.yarn;

import org.dinky.assertion.Asserts;
import org.dinky.constant.CustomerConfigureOptions;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.constant.DirConstant;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.model.CustomConfig;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.AbstractGateway;
import org.dinky.gateway.config.ClusterConfig;
import org.dinky.gateway.config.FlinkConfig;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.enums.ActionType;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.gateway.result.YarnResult;
import org.dinky.utils.FlinkJsonUtil;
import org.dinky.utils.ThreadUtil;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
Expand All @@ -63,40 +45,39 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.http.HttpResponse;
import org.apache.zookeeper.ZooKeeper;
import org.dinky.assertion.Asserts;
import org.dinky.constant.CustomerConfigureOptions;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.constant.DirConstant;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.model.CustomConfig;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.AbstractGateway;
import org.dinky.gateway.config.ClusterConfig;
import org.dinky.gateway.config.FlinkConfig;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.enums.ActionType;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.gateway.result.YarnResult;
import org.dinky.utils.FlinkJsonUtil;
import org.dinky.utils.ThreadUtil;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;

public abstract class YarnGateway extends AbstractGateway {
private static final String HTML_TAG_REGEX = "<pre>(.*)</pre>";
private final String TMP_SQL_EXEC_DIR =
Expand All @@ -106,6 +87,10 @@ public abstract class YarnGateway extends AbstractGateway {

protected YarnClient yarnClient;

protected SecurityConfiguration securityConfiguration = null;

private static boolean ENABLE_KERBEROS_AUTH = false;

public YarnGateway() {}

public YarnGateway(GatewayConfig config) {
Expand Down Expand Up @@ -149,11 +134,14 @@ private void initConfig() {

if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) {
try {
SecurityUtils.install(new SecurityConfiguration(configuration));
securityConfiguration = new SecurityConfiguration(configuration);
ENABLE_KERBEROS_AUTH = true;
SecurityUtils.install(securityConfiguration);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
logger.info(
"Security authentication completed, user and authentication method:{}", currentUser.toString());
} catch (Exception e) {
ENABLE_KERBEROS_AUTH = false;
logger.error(e.getMessage(), e);
}
}
Expand Down Expand Up @@ -377,8 +365,7 @@ protected YarnClusterDescriptor createInitYarnClusterDescriptor() {
true);
}

protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResult result)
throws YarnException, IOException, InterruptedException {
protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResult result) throws Exception {
String webUrl;
int counts = SystemConfiguration.getInstances().GetJobIdWaitValue();
while (yarnClient.getApplicationReport(clusterClient.getClusterId()).getYarnApplicationState()
Expand All @@ -403,7 +390,46 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
.getTrackingUrl()
+ JobsOverviewHeaders.URL.substring(1);

String json = HttpUtil.get(url);
// 访问Flink WebUI 增加Kerberos认证调用HTTP API
//----------------开始----------------------------
String json = null;
logger.info("ENABLE_KERBEROS_AUTH:"+ENABLE_KERBEROS_AUTH);
HttpResponse httpResponse = null;
if (ENABLE_KERBEROS_AUTH) {
logger.info(
"you are using kerberos authentication, please make sure you have kinit, now start to login");
// SecurityUtils.install(securityConfiguration);
String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
logger.info("认证凭证 principal:"+principal +"||keytab:"+keytab);
BufferedReader in = null;
try {
RequestKerberosUrlUtils restTest = new RequestKerberosUrlUtils(principal, keytab, null, false);
httpResponse = restTest.callRestUrl(url, principal);
InputStream inputStream = httpResponse.getEntity().getContent();
in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
String str = null;
while ((str = in.readLine()) != null) {
logger.info("返回Flink Web API结果:"+ str);
json = str;
}
if (httpResponse.getStatusLine().getStatusCode() != 200) {
logger.info("认证失败:"+httpResponse.getEntity().getContent().toString());
throw new RuntimeException(String.format(
"Failed to get job details, please check yarn cluster status. Web URL is: %s the job tracking url is: %s",
webUrl, url));
}
}catch (Exception e) {
logger.info("认证失败:"+e.getMessage());
e.printStackTrace();
}
logger.info("kerberos authentication login successfully and start to get job details");
}else {
json = HttpUtil.get(url);
}

// 访问Flink WebUI 增加Kerberos认证调用HTTP API
//----------------结束----------------------------
try {
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
Expand Down

0 comments on commit ad8733c

Please sign in to comment.