diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/utils/RequestKerberosUrlUtils.java b/dinky-gateway/src/main/java/org/dinky/gateway/utils/RequestKerberosUrlUtils.java new file mode 100644 index 0000000000..4157c6688b --- /dev/null +++ b/dinky-gateway/src/main/java/org/dinky/gateway/utils/RequestKerberosUrlUtils.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.gateway.utils; + +import org.apache.http.HttpResponse; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; + +import java.io.IOException; +import java.security.Principal; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RequestKerberosUrlUtils { + public static Logger logger = LoggerFactory.getLogger(RequestKerberosUrlUtils.class); + private String principal; + private String keyTabLocation; + + public RequestKerberosUrlUtils() {} + + public RequestKerberosUrlUtils(String principal, String keyTabLocation) { + this.principal = principal; + this.keyTabLocation = keyTabLocation; + } + + public RequestKerberosUrlUtils(String principal, String keyTabLocation, boolean isDebug) { + this(principal, keyTabLocation); + if (isDebug) { + System.setProperty("sun.security.spnego.debug", "true"); + System.setProperty("sun.security.krb5.debug", "true"); + } + } + + public RequestKerberosUrlUtils(String principal, String keyTabLocation, String krb5Location, boolean isDebug) { + this(principal, keyTabLocation, isDebug); + // System.setProperty("java.security.krb5.conf", krb5Location); + } + + private static HttpClient buildSpengoHttpClient() { + + Lookup authSchemeRegistry = RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)) + .build(); + + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(new AuthScope(null, -1, null), new Credentials() { + @Override + public Principal getUserPrincipal() { + return null; + } + + @Override + public String getPassword() { + return null; + } + }); + + CloseableHttpClient httpClient = HttpClientBuilder.create() + .setDefaultAuthSchemeRegistry(authSchemeRegistry) + .setDefaultCredentialsProvider(credentialsProvider) + .build(); + return httpClient; + } + + public HttpResponse callRestUrl(final String url, final String userId) { + // logger.warn(String.format("Calling KerberosHttpClient %s %s %s", this.principal, this.keyTabLocation, + // url)); + Configuration config = new Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + HashMap options = new HashMap() { + { + put("useTicketCache", "false"); + put("useKeyTab", "true"); + put("keyTab", keyTabLocation); + // Krb5 in GSS API needs to be refreshed so it does not throw the error + // Specified version of key is not available + put("refreshKrb5Config", "true"); + put("principal", principal); + put("storeKey", "true"); + put("doNotPrompt", "true"); + put("isInitiator", "true"); + put("debug", "true"); + } + }; + return new AppConfigurationEntry[] { + new AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options) + }; + } + }; + Set princ = new HashSet(1); + princ.add(new KerberosPrincipal(userId)); + Subject sub = new Subject(false, princ, new HashSet(), new HashSet()); + try { + // auth module:Krb5Login + LoginContext lc = new LoginContext("Krb5Login", sub, null, config); + lc.login(); + Subject serviceSubject = lc.getSubject(); + return Subject.doAs(serviceSubject, new PrivilegedAction() { + HttpResponse httpResponse = null; + + @Override + public HttpResponse run() { + try { + HttpClient spnegoHttpClient = buildSpengoHttpClient(); + httpResponse = spnegoHttpClient.execute(new HttpGet(url)); + return httpResponse; + } catch (IOException ioe) { + ioe.printStackTrace(); + } + return httpResponse; + } + }); + } catch (Exception le) { + le.printStackTrace(); + } + return null; + } +} diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index d2d820d3ca..84bb022fda 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -37,6 +37,7 @@ import org.dinky.gateway.result.SavePointResult; import org.dinky.gateway.result.TestResult; import org.dinky.gateway.result.YarnResult; +import org.dinky.gateway.utils.RequestKerberosUrlUtils; import org.dinky.utils.FlinkJsonUtil; import org.dinky.utils.ThreadUtil; @@ -73,9 +74,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.zookeeper.ZooKeeper; +import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.ObjectInputStream; import java.net.URI; import java.util.ArrayList; @@ -404,6 +408,39 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul + JobsOverviewHeaders.URL.substring(1); String json = HttpUtil.get(url); + + // 增加判断访问Flink WebUI如果认证失败,尝试使用Kerberos认证 + if (HttpUtil.createGet(url).execute().getStatus() == 401) { + logger.info("yarn application api url:" + url); + logger.info( + "HTTP API return code 401, try to authenticate using the Kerberos get yarn application state."); + org.apache.http.HttpResponse httpResponse = null; + String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); + String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB); + logger.info("get principal:" + principal + "||keytab:" + keytab); + BufferedReader in = null; + try { + RequestKerberosUrlUtils restTest = new RequestKerberosUrlUtils(principal, keytab, null, false); + httpResponse = restTest.callRestUrl(url, principal); + InputStream inputStream = httpResponse.getEntity().getContent(); + in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); + String str = null; + while ((str = in.readLine()) != null) { + logger.info("yarn application state api content:" + str); + json = str; + } + if (httpResponse.getStatusLine().getStatusCode() != 200) { + throw new RuntimeException(String.format( + "Failed to get job details, please check yarn cluster status. Web URL is: %s the job tracking url is: %s", + webUrl, url)); + } + } catch (Exception e) { + logger.info("Failed to kerberos authentication:" + e.getMessage()); + e.printStackTrace(); + } + logger.info("kerberos authentication login successfully and start to get job details"); + } + try { MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance()); jobDetailsList.addAll(jobsDetails.getJobs());