From 84a107fcd21c1d9683e6852e251924020102126d Mon Sep 17 00:00:00 2001 From: "ze.miao" Date: Mon, 9 Dec 2024 15:52:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9=E4=B8=BA=E8=B0=83=E7=94=A8ya?= =?UTF-8?q?rn=20webui=E6=97=B6=E5=80=99=E5=A6=82HTTP=E8=BF=94=E5=9B=9E401?= =?UTF-8?q?=E6=97=B6=E5=80=99,=E5=90=8E=E9=9D=A2=E8=A1=A5=E5=85=85?= =?UTF-8?q?=E9=87=87=E7=94=A8Kerberos=E6=96=B9=E5=BC=8F=E8=AE=A4=E8=AF=81H?= =?UTF-8?q?TTP,=E8=8E=B7=E5=8F=96Flink=E4=BB=BB=E5=8A=A1=E7=8A=B6=E6=80=81?= =?UTF-8?q?=20=E5=A2=9E=E5=8A=A0=E8=B0=83=E7=94=A8Flink=20HTTP=20API?= =?UTF-8?q?=E6=97=B6=E5=80=99=E8=BF=9B=E8=A1=8CKerberos=E8=AE=A4=E8=AF=81?= =?UTF-8?q?=EF=BC=8C=E7=9B=B8=E5=85=B3issues:=20https://github.com/DataLin?= =?UTF-8?q?kDC/dinky/issues/3470?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/dinky/gateway/yarn/YarnGateway.java | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index 165b5aff0e..f901484fe9 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -101,7 +101,6 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; - public abstract class YarnGateway extends AbstractGateway { private static final String HTML_TAG_REGEX = "
(.*)
"; private final String TMP_SQL_EXEC_DIR = @@ -111,8 +110,6 @@ public abstract class YarnGateway extends AbstractGateway { protected YarnClient yarnClient; - private static boolean ENABLE_KERBEROS_AUTH = false; - public YarnGateway() {} public YarnGateway(GatewayConfig config) { @@ -156,13 +153,11 @@ private void initConfig() { if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) { try { - ENABLE_KERBEROS_AUTH = true; SecurityUtils.install(new SecurityConfiguration(configuration)); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); logger.info( "Security authentication completed, user and authentication method:{}", currentUser.toString()); } catch (Exception e) { - ENABLE_KERBEROS_AUTH = false; logger.error(e.getMessage(), e); } } @@ -391,7 +386,7 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul String webUrl; int counts = SystemConfiguration.getInstances().GetJobIdWaitValue(); while (yarnClient.getApplicationReport(clusterClient.getClusterId()).getYarnApplicationState() - == YarnApplicationState.ACCEPTED + == YarnApplicationState.ACCEPTED && counts-- > 0) { Thread.sleep(1000); } @@ -408,21 +403,21 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul // 睡眠1秒,防止flink因为依赖或其他问题导致任务秒挂 Thread.sleep(1000); String url = yarnClient - .getApplicationReport(clusterClient.getClusterId()) - .getTrackingUrl() + .getApplicationReport(clusterClient.getClusterId()) + .getTrackingUrl() + JobsOverviewHeaders.URL.substring(1); - // 访问Flink WebUI 增加Kerberos认证调用HTTP API - // ----------------开始---------------------------- - String json = null; - logger.info("ENABLE_KERBEROS_AUTH:" + ENABLE_KERBEROS_AUTH); - org.apache.http.HttpResponse httpResponse = null; - if (ENABLE_KERBEROS_AUTH) { + String json = HttpUtil.get(url); + + // 增加判断访问Flink WebUI如果认证失败,尝试使用Kerberos认证 + if (HttpUtil.createGet(url).execute().getStatus() == 401) { + logger.info("yarn application api url:" + url); logger.info( - "you are using kerberos authentication, please make sure you have kinit, now start to login"); + "HTTP API return code 401, try to authenticate using the Kerberos get yarn application state."); + org.apache.http.HttpResponse httpResponse = null; String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB); - logger.info("认证凭证 principal:" + principal + "||keytab:" + keytab); + logger.info("get principal:" + principal + "||keytab:" + keytab); BufferedReader in = null; try { RequestKerberosUrlUtils restTest = new RequestKerberosUrlUtils(principal, keytab, null, false); @@ -431,26 +426,21 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); String str = null; while ((str = in.readLine()) != null) { - logger.info("返回Flink Web API结果:" + str); + logger.info("yarn application state api content:" + str); json = str; } if (httpResponse.getStatusLine().getStatusCode() != 200) { - logger.info( - "认证失败:" + httpResponse.getEntity().getContent().toString()); throw new RuntimeException(String.format( "Failed to get job details, please check yarn cluster status. Web URL is: %s the job tracking url is: %s", webUrl, url)); } } catch (Exception e) { - logger.info("认证失败:" + e.getMessage()); + logger.info("Failed to kerberos authentication:" + e.getMessage()); e.printStackTrace(); } logger.info("kerberos authentication login successfully and start to get job details"); - } else { - json = HttpUtil.get(url); } - // 访问Flink WebUI 增加Kerberos认证调用HTTP API - // ----------------结束---------------------------- + try { MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance()); jobDetailsList.addAll(jobsDetails.getJobs()); @@ -480,8 +470,8 @@ protected String getYarnContainerLog(ApplicationReport applicationReport) throws // Wait for up to 2.5 s. If the history log is not found yet, a prompt message will be returned. int counts = 5; while (yarnClient - .getContainers(applicationReport.getCurrentApplicationAttemptId()) - .isEmpty() + .getContainers(applicationReport.getCurrentApplicationAttemptId()) + .isEmpty() && counts-- > 0) { ThreadUtil.sleep(500); }