Skip to content

Commit

Permalink
更改为调用yarn webui时候如HTTP返回401时候,后面补充采用Kerberos方式认证HTTP,获取Flink任务状态
Browse files Browse the repository at this point in the history
增加调用Flink HTTP API时候进行Kerberos认证,相关issues: DataLinkDC#3470
  • Loading branch information
ze.miao committed Dec 9, 2024
1 parent 47eef1b commit 84a107f
Showing 1 changed file with 16 additions and 26 deletions.
42 changes: 16 additions & 26 deletions dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;


public abstract class YarnGateway extends AbstractGateway {
private static final String HTML_TAG_REGEX = "<pre>(.*)</pre>";
private final String TMP_SQL_EXEC_DIR =
Expand All @@ -111,8 +110,6 @@ public abstract class YarnGateway extends AbstractGateway {

protected YarnClient yarnClient;

private static boolean ENABLE_KERBEROS_AUTH = false;

public YarnGateway() {}

public YarnGateway(GatewayConfig config) {
Expand Down Expand Up @@ -156,13 +153,11 @@ private void initConfig() {

if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) {
try {
ENABLE_KERBEROS_AUTH = true;
SecurityUtils.install(new SecurityConfiguration(configuration));
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
logger.info(
"Security authentication completed, user and authentication method:{}", currentUser.toString());
} catch (Exception e) {
ENABLE_KERBEROS_AUTH = false;
logger.error(e.getMessage(), e);
}
}
Expand Down Expand Up @@ -391,7 +386,7 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
String webUrl;
int counts = SystemConfiguration.getInstances().GetJobIdWaitValue();
while (yarnClient.getApplicationReport(clusterClient.getClusterId()).getYarnApplicationState()
== YarnApplicationState.ACCEPTED
== YarnApplicationState.ACCEPTED
&& counts-- > 0) {
Thread.sleep(1000);
}
Expand All @@ -408,21 +403,21 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
// 睡眠1秒,防止flink因为依赖或其他问题导致任务秒挂
Thread.sleep(1000);
String url = yarnClient
.getApplicationReport(clusterClient.getClusterId())
.getTrackingUrl()
.getApplicationReport(clusterClient.getClusterId())
.getTrackingUrl()
+ JobsOverviewHeaders.URL.substring(1);

// 访问Flink WebUI 增加Kerberos认证调用HTTP API
// ----------------开始----------------------------
String json = null;
logger.info("ENABLE_KERBEROS_AUTH:" + ENABLE_KERBEROS_AUTH);
org.apache.http.HttpResponse httpResponse = null;
if (ENABLE_KERBEROS_AUTH) {
String json = HttpUtil.get(url);

// 增加判断访问Flink WebUI如果认证失败,尝试使用Kerberos认证
if (HttpUtil.createGet(url).execute().getStatus() == 401) {
logger.info("yarn application api url:" + url);
logger.info(
"you are using kerberos authentication, please make sure you have kinit, now start to login");
"HTTP API return code 401, try to authenticate using the Kerberos get yarn application state.");
org.apache.http.HttpResponse httpResponse = null;
String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
logger.info("认证凭证 principal:" + principal + "||keytab:" + keytab);
logger.info("get principal:" + principal + "||keytab:" + keytab);
BufferedReader in = null;
try {
RequestKerberosUrlUtils restTest = new RequestKerberosUrlUtils(principal, keytab, null, false);
Expand All @@ -431,26 +426,21 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
String str = null;
while ((str = in.readLine()) != null) {
logger.info("返回Flink Web API结果:" + str);
logger.info("yarn application state api content:" + str);
json = str;
}
if (httpResponse.getStatusLine().getStatusCode() != 200) {
logger.info(
"认证失败:" + httpResponse.getEntity().getContent().toString());
throw new RuntimeException(String.format(
"Failed to get job details, please check yarn cluster status. Web URL is: %s the job tracking url is: %s",
webUrl, url));
}
} catch (Exception e) {
logger.info("认证失败:" + e.getMessage());
logger.info("Failed to kerberos authentication:" + e.getMessage());
e.printStackTrace();
}
logger.info("kerberos authentication login successfully and start to get job details");
} else {
json = HttpUtil.get(url);
}
// 访问Flink WebUI 增加Kerberos认证调用HTTP API
// ----------------结束----------------------------

try {
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
Expand Down Expand Up @@ -480,8 +470,8 @@ protected String getYarnContainerLog(ApplicationReport applicationReport) throws
// Wait for up to 2.5 s. If the history log is not found yet, a prompt message will be returned.
int counts = 5;
while (yarnClient
.getContainers(applicationReport.getCurrentApplicationAttemptId())
.isEmpty()
.getContainers(applicationReport.getCurrentApplicationAttemptId())
.isEmpty()
&& counts-- > 0) {
ThreadUtil.sleep(500);
}
Expand Down

0 comments on commit 84a107f

Please sign in to comment.