diff --git a/bin/datasophon-api.sh b/bin/datasophon-api.sh index 72812101..877e8992 100644 --- a/bin/datasophon-api.sh +++ b/bin/datasophon-api.sh @@ -158,7 +158,7 @@ case $startStop in fi echo starting $command, logging to $log - exec_command="$DDH_OPTS -classpath $DDH_CONF_DIR:$DDH_LIB_JARS $CLASS" + exec_command="$DDH_OPTS -Dspring.profiles.active=config -classpath $DDH_CONF_DIR:$DDH_LIB_JARS $CLASS" echo "nohup $JAVA $exec_command > $log 2>&1 &" nohup $JAVA $exec_command > $log 2>&1 & diff --git a/datasophon-api/pom.xml b/datasophon-api/pom.xml index e38e9164..59cfec2e 100644 --- a/datasophon-api/pom.xml +++ b/datasophon-api/pom.xml @@ -46,6 +46,11 @@ datasophon-ui ${project.version} + + + org.eclipse.jetty + jetty-proxy + diff --git a/datasophon-api/src/main/java/com/datasophon/api/configuration/AppConfiguration.java b/datasophon-api/src/main/java/com/datasophon/api/configuration/AppConfiguration.java index a603bccf..13b236ff 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/configuration/AppConfiguration.java +++ b/datasophon-api/src/main/java/com/datasophon/api/configuration/AppConfiguration.java @@ -96,6 +96,7 @@ public void addInterceptors(InterceptorRegistry registry) { // login registry.addInterceptor(loginInterceptor()) .addPathPatterns("/**").excludePathPatterns("/login", "/error", + "/grafana/**", "/service/install/downloadPackage", "/service/install/downloadResource", "/cluster/alert/history/save", diff --git a/datasophon-api/src/main/java/com/datasophon/api/configuration/ConfigPropertiesExtend.java b/datasophon-api/src/main/java/com/datasophon/api/configuration/ConfigPropertiesExtend.java index 6b4c48bd..01d09b3e 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/configuration/ConfigPropertiesExtend.java +++ b/datasophon-api/src/main/java/com/datasophon/api/configuration/ConfigPropertiesExtend.java @@ -36,8 +36,27 @@ public class ConfigPropertiesExtend implements EnvironmentPostProcessor { @Override public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { MutablePropertySources propertySources = environment.getPropertySources(); + + // load the datasophon configuration (config/profiles/application-config.yml) + List activeProfiles = Arrays.asList(environment.getActiveProfiles()); + if (!activeProfiles.isEmpty() && !Collections.singletonList("config").containsAll(activeProfiles)) { + // running other profiles + return; + } + try { + List> configPropertySources = new YamlPropertySourceLoader().load(DEFAULT_APPLICATION_CONFIG, new FileSystemResource(DEFAULT_APPLICATION_CONFIG)); + if (!CollectionUtils.isEmpty(configPropertySources)) { + for (PropertySource propertySource : configPropertySources) { + propertySources.addFirst(propertySource); + } + } + } catch (Exception e) { + System.err.println("Default config application-config not found "); + log.error("Default config application-config not found", e); + } + + // load the datasophon configuration (config/datasophon.conf) Properties properties = loadCustomProperties(); - checkProfile(environment); propertySources.addFirst(new PropertiesPropertySource("datasophonConfig", properties)); } @@ -47,8 +66,7 @@ private Properties loadCustomProperties() { try (InputStream inputStream = Files.newInputStream(file.toPath())) { properties.load(inputStream); } catch (Exception e) { - System.err.println( - "Failed to load the datart configuration (config/datart.conf), use application-config.yml"); + System.err.println("Failed to load the datasophon configuration (config/datasophon.conf), use application-config.yml"); return new Properties(); } List removeKeys = new ArrayList<>(); @@ -64,22 +82,4 @@ private Properties loadCustomProperties() { } return properties; } - - private void checkProfile(ConfigurableEnvironment environment) { - List activeProfiles = Arrays.asList(environment.getActiveProfiles()); - if (!activeProfiles.isEmpty() && !Collections.singletonList("config").containsAll(activeProfiles)) { - // running other profiles - return; - } - try { - List> propertySources = new YamlPropertySourceLoader().load(DEFAULT_APPLICATION_CONFIG, - new FileSystemResource(DEFAULT_APPLICATION_CONFIG)); - if (CollectionUtils.isEmpty(propertySources)) { - System.err.println("Default config application-config not found "); - } - } catch (Exception e) { - System.err.println("Default config application-config not found "); - log.error("Default config application-config not found", e); - } - } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/configuration/GrafanaProxyConfiguration.java b/datasophon-api/src/main/java/com/datasophon/api/configuration/GrafanaProxyConfiguration.java new file mode 100644 index 00000000..737631a2 --- /dev/null +++ b/datasophon-api/src/main/java/com/datasophon/api/configuration/GrafanaProxyConfiguration.java @@ -0,0 +1,400 @@ +package com.datasophon.api.configuration; + +import static com.datasophon.common.Constants.GRAFANA_PATH; + +import com.datasophon.api.service.ClusterServiceDashboardService; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.servlet.Servlet; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import lombok.SneakyThrows; + +import org.eclipse.jetty.proxy.ProxyServlet; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.UpgradeRequest; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.WebSocketBehavior; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.web.servlet.ServletRegistrationBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.util.UriComponentsBuilder; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; + +@Configuration +@ConditionalOnProperty(name = "datasophon.proxy-grafana.enable", havingValue = "true") +public class GrafanaProxyConfiguration { + + @Value("${datasophon.proxy-grafana.max-threads:32}") + String maxThreads; + + @Autowired + private ClusterServiceDashboardService clusterServiceDashboardService; + + @Bean + public ServletRegistrationBean grafanaHttpProxy() { + ServletRegistrationBean servlet = new ServletRegistrationBean<>(new GrafanaProxyServlet(), + GRAFANA_PATH + "/*"); + servlet.setInitParameters(MapUtil.builder("maxThreads", maxThreads) + .put("preserveHost", "true").build()); + return servlet; + } + + public class GrafanaProxyServlet extends ProxyServlet { + + private static final long WEBSOCKET_DEFAULT_MAX_IDLE = 60000; // 1 minute + private static final int WEBSOCKET_DEFAULT_MAX_BUFF = 1024 * 1024; // 1 mb + + private WebSocketServletFactory factory; + + @Override + public void init() throws ServletException { + super.init(); + + try { + WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER); + ServletContext ctx = getServletContext(); + factory = WebSocketServletFactory.Loader.load(ctx, policy); + factory.getPolicy().setIdleTimeout(WEBSOCKET_DEFAULT_MAX_IDLE); + factory.getPolicy().setMaxBinaryMessageSize(WEBSOCKET_DEFAULT_MAX_BUFF); + factory.getPolicy().setMaxTextMessageBufferSize(WEBSOCKET_DEFAULT_MAX_BUFF); + factory.setCreator(new WebSocketServerCreator()); + factory.start(); + ctx.setAttribute(WebSocketServletFactory.class.getName(), factory); + } catch (Exception x) { + throw new ServletException(x); + } + } + + @SneakyThrows + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + String wsKey = request.getHeader("Sec-WebSocket-Key"); + if (wsKey != null) { + if (factory.isUpgradeRequest(request, response)) { + if (factory.acceptWebSocket(request, response)) { + return; + } + if (response.isCommitted()) { + return; + } + } + + } + super.service(request, response); + } + + @Override + protected String rewriteTarget(HttpServletRequest request) { + // 获取集群 grafana 的地址 + Integer clusterId = getCluster(request.getRequestURI()); + if (clusterId != null) { + String host = "http://" + clusterServiceDashboardService.getGrafanaHost(clusterId); + return UriComponentsBuilder.fromUriString(host) + .path(request.getRequestURI()) + .query(request.getQueryString()) + .build(true).toUriString(); + } else { + return super.rewriteTarget(request); + } + } + + @Override + public void destroy() { + try { + ServletContext ctx = getServletContext(); + ctx.removeAttribute(WebSocketServletFactory.class.getName()); + factory.stop(); + factory = null; + } catch (Exception ignore) { + // ignore; + } + } + } + + private Integer getCluster(String requestUrl) { + try { + List paths = StrUtil.splitTrim(requestUrl, "/"); + if (paths.size() > 3) { + return new Integer(paths.get(2)); + } else { + return null; + } + } catch (Exception e) { + return null; + } + } + + public class WebSocketServerServlet extends WebSocketServlet { + private final Logger log = LoggerFactory.getLogger(WebSocketServerServlet.class); + private static final long WEBSOCKET_DEFAULT_MAX_IDLE = 60000; // 1 minute + private static final int WEBSOCKET_DEFAULT_MAX_BUFF = 1024 * 1024; // 1 mb + + @Override + public void configure(WebSocketServletFactory factory) { + log.info("Configuring the web socket adapter"); + factory.getPolicy().setIdleTimeout(WEBSOCKET_DEFAULT_MAX_IDLE); + factory.getPolicy().setMaxBinaryMessageSize(WEBSOCKET_DEFAULT_MAX_BUFF); + factory.getPolicy().setMaxTextMessageBufferSize(WEBSOCKET_DEFAULT_MAX_BUFF); + + factory.setCreator(new WebSocketServerCreator()); + } + } + + public class WebSocketServerCreator implements WebSocketCreator { + + private final WebSocketServerAdapter websocket = new WebSocketServerAdapter(); + + @Override + public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { + for (String subprotocol : servletUpgradeRequest.getSubProtocols()) { + if ("binary".equals(subprotocol)) { + servletUpgradeResponse.setAcceptedSubProtocol(subprotocol); + } + } + return websocket; + } + } + + public class WebSocketServerAdapter extends WebSocketAdapter { + private final Logger log = LoggerFactory.getLogger(WebSocketServerAdapter.class); + + private SslContextFactory sslContextFactory; + private final WebSocketClient webSocketClient; + private Session proxyingSession; + + private static final String WEBSOCKET_URL_FORMAT = "ws://%s%s"; + + public WebSocketServerAdapter() { + initSslContextFactory(); + this.webSocketClient = new WebSocketClient(this.sslContextFactory); + } + + private void initSslContextFactory() { + // initialize SslContextFactory + this.sslContextFactory = new SslContextFactory(); + // SSLContext sslContext = .... + // this.sslContextFactory.setSslContext(sslContext); + + // ArrayList ciphersToEnable = new ArrayList<> (sslContext.getDefaultSSLParameters ().getCipherSuites ().length + 1); + // ciphersToEnable.add (".*_GCM_.*"); // GCM first in case ordering is honored + // ciphersToEnable.addAll (Arrays.asList (sslContext.getDefaultSSLParameters ().getCipherSuites ())); + // this.sslContextFactory.setIncludeCipherSuites (ciphersToEnable.toArray (new String[ciphersToEnable.size ()])); + // this.sslContextFactory.setExcludeCipherSuites (".*[Aa][Nn][Oo][Nn].*", ".*[Nn][Uu][Ll][Ll].*"); + } + + @Override + public void onWebSocketConnect(Session sess) { + super.onWebSocketConnect(sess); + + String host = clusterServiceDashboardService.getGrafanaHost(1); // target ip + String path = sess.getUpgradeRequest().getRequestURI().getPath(); // url path + String dest = String.format(WEBSOCKET_URL_FORMAT, host, path); + try { + URI destUri = new URI(dest); + + webSocketClient.start(); + Future future = webSocketClient.connect(new WebSocketClientAdapter(sess), destUri, getClientUpgradeRequest(sess)); + + proxyingSession = future.get(); + if (proxyingSession != null && proxyingSession.isOpen()) { + log.debug("websocket connected to {}", dest); + } + } catch (URISyntaxException e) { + log.error("invalid url: {}", dest); + } catch (IOException | ExecutionException | InterruptedException e) { + log.error("exception while connecting to {}", dest, e); + } catch (Exception e) { + log.error("exception while starting websocket client", e); + } + } + + @Override + public void onWebSocketText(String message) { + super.onWebSocketText(message); + log.debug("websocket message received {}", message); + // forwarding ... + if (proxyingSession != null && proxyingSession.isOpen()) { + try { + proxyingSession.getRemote().sendString(message); + } catch (IOException e) { + log.error("exception while forwarding text message to client", e); + } + } else { + log.error("proxying session is null or closed."); + } + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + super.onWebSocketBinary(payload, offset, len); + log.debug("websocket binary received, offset:{}, len: {}", offset, len); + // forwarding ... + if (proxyingSession != null && proxyingSession.isOpen()) { + ByteBuffer byteBuffer = ByteBuffer.wrap(payload, offset, len); + try { + this.proxyingSession.getRemote().sendBytes(byteBuffer); + } catch (IOException e) { + log.error("exception while forwarding text message to client", e); + } + } else { + log.error("proxying session is null or closed."); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + super.onWebSocketClose(statusCode, reason); + log.debug("Socket Closed: status code: [ {} ]", statusCode); + disconnect(); + } + + private ClientUpgradeRequest getClientUpgradeRequest(Session sess) { + ClientUpgradeRequest request = new ClientUpgradeRequest(); + + UpgradeRequest upgradeRequest = sess.getUpgradeRequest(); + + request.setCookies(upgradeRequest.getCookies()); + request.setSubProtocols(upgradeRequest.getSubProtocols()); + + Map> headers = upgradeRequest.getHeaders(); + headers.forEach(request::setHeader); + return request; + } + + /** + * This method unregisters the subscriber and closes the connection + */ + private void disconnect() { + + if (this.getSession() == null || !this.getSession().isOpen()) { + return; + } + + try { + this.getSession().disconnect(); + + if (isConnected()) { + log.debug("Could not disconnect the websocket client"); + } + } catch (Exception e) { + log.error("Exception on disconnecting the websocket client"); + } finally { + if (this.webSocketClient != null) { + try { + this.webSocketClient.stop(); + } catch (Exception e) { + log.error("Exception while stopping websocket client", e); + } + } + } + + } + } + + public class WebSocketClientAdapter extends WebSocketAdapter { + private final Logger log = LoggerFactory.getLogger(WebSocketClientAdapter.class); + + private final Session proxyingSess; + + WebSocketClientAdapter(Session sess) { + this.proxyingSess = sess; + } + + @Override + public void onWebSocketConnect(Session sess) { + super.onWebSocketConnect(sess); + + log.debug("websocket client connected ..."); + } + + @Override + public void onWebSocketText(String message) { + super.onWebSocketText(message); + log.debug("websocket message received {}", message); + // forwarding + if (this.proxyingSess != null && this.proxyingSess.isOpen()) { + try { + this.proxyingSess.getRemote().sendString(message); + } catch (IOException e) { + log.error("exception while forwarding text message to client", e); + } + } else { + log.error("proxying session is null or closed."); + } + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + super.onWebSocketBinary(payload, offset, len); + log.debug("websocket binary received, offset:{}, len: {}", offset, len); + // forwarding ... + ByteBuffer byteBuffer = ByteBuffer.wrap(payload, offset, len); + if (this.proxyingSess != null && this.proxyingSess.isOpen()) { + try { + this.proxyingSess.getRemote().sendBytes(byteBuffer); + } catch (IOException e) { + log.error("exception while forwarding binary to client", e); + } + } else { + log.error("proxying session is null or closed."); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + super.onWebSocketClose(statusCode, reason); + log.debug("Socket Closed: status code: [ {} ]", statusCode); + disconnect(); + } + + /** + * This method unregisters the subscriber and closes the connection + */ + private void disconnect() { + + if (this.getSession() == null || !this.getSession().isOpen()) { + return; + } + + try { + this.getSession().disconnect(); + + if (isConnected()) { + log.debug("Could not disconnect the websocket client"); + } + } catch (Exception e) { + log.error("Exception on disconnecting the websocket client"); + } + + } + } + +} diff --git a/datasophon-api/src/main/resources/application.yml b/datasophon-api/src/main/resources/application.yml index 420fad8c..e6daa45a 100644 --- a/datasophon-api/src/main/resources/application.yml +++ b/datasophon-api/src/main/resources/application.yml @@ -2,7 +2,7 @@ server: port: 8081 servlet: - context-path: /ddh/ + context-path: /ddh security: authentication: type: PASSWORD @@ -63,4 +63,7 @@ logging: datasophon: migration: enable: true + proxy-grafana: + enable: false +# max-threads: 32 diff --git a/datasophon-api/src/main/resources/meta/DDP-1.2.2/GRAFANA/service_ddl.json b/datasophon-api/src/main/resources/meta/DDP-1.2.2/GRAFANA/service_ddl.json index 2bce535b..407029f4 100644 --- a/datasophon-api/src/main/resources/meta/DDP-1.2.2/GRAFANA/service_ddl.json +++ b/datasophon-api/src/main/resources/meta/DDP-1.2.2/GRAFANA/service_ddl.json @@ -59,8 +59,16 @@ } ], "configWriter": { - "generators": [ - ] + "generators": [{ + "filename": "custom.ini", + "configFormat": "custom", + "outputDirectory": "conf", + "templateName": "grafana.ftl", + "conditionalOnProperty": "datasophon.proxy-grafana.enable", + "includeParams": [ + + ] + }] }, "parameters": [ ] diff --git a/datasophon-api/src/main/resources/meta/DDP-1.2.2/HDFS/service_ddl.json b/datasophon-api/src/main/resources/meta/DDP-1.2.2/HDFS/service_ddl.json index 0aa29cfe..3cc08f15 100644 --- a/datasophon-api/src/main/resources/meta/DDP-1.2.2/HDFS/service_ddl.json +++ b/datasophon-api/src/main/resources/meta/DDP-1.2.2/HDFS/service_ddl.json @@ -390,6 +390,24 @@ "hadoopHome", "rangerAdminUrl" ] + }, + { + "filename": "whitelist", + "configFormat": "custom", + "outputDirectory": "etc/hadoop/", + "templateName": "properties3.ftl", + "includeParams": [ + + ] + }, + { + "filename": "blacklist", + "configFormat": "custom", + "outputDirectory": "etc/hadoop/", + "templateName": "properties3.ftl", + "includeParams": [ + + ] } ] }, diff --git a/datasophon-api/src/main/resources/meta/DDP-1.2.2/KYUUBI/service_ddl.json b/datasophon-api/src/main/resources/meta/DDP-1.2.2/KYUUBI/service_ddl.json index 825e325f..b351fe08 100644 --- a/datasophon-api/src/main/resources/meta/DDP-1.2.2/KYUUBI/service_ddl.json +++ b/datasophon-api/src/main/resources/meta/DDP-1.2.2/KYUUBI/service_ddl.json @@ -2,11 +2,11 @@ "name": "KYUUBI", "label": "Kyuubi", "description": "统一多租户JDBC网关", - "version": "1.7.3", + "version": "1.7.4", "sortNum": 30, "dependencies":[], - "packageName": "kyuubi-1.7.3.tar.gz", - "decompressPackageName": "kyuubi-1.7.3", + "packageName": "apache-kyuubi-1.7.4-bin.tgz", + "decompressPackageName": "apache-kyuubi-1.7.4-bin", "roles": [ { "name": "KyuubiServer", @@ -19,6 +19,17 @@ "cardinality": "1+", "jmxPort": "10019", "logFile": "logs/kyuubi-server-${host}.out", + "resourceStrategies":[{ + "type": "append_line", + "source": "bin/kyuubi", + "line": 206, + "text": " exit 1" + },{ + "type": "append_line", + "source": "bin/kyuubi", + "line": 210, + "text": " exit 1" + }], "startRunner": { "timeout": "60", "program": "bin/kyuubi", @@ -67,12 +78,32 @@ "configFormat": "properties2", "outputDirectory": "conf", "includeParams": [ - "kyuubi.ha.zookeeper.namespace", - "kyuubi.ha.zookeeper.quorum", + "kyuubi.ha.addresses", + "kyuubi.ha.namespace", "kyuubi.session.idle.timeout", "kyuubi.session.engine.idle.timeout", "kyuubi.session.engine.initialize.timeout", "spark.master", + "spark.submit.deployMode", + "spark.driver.memory", + "spark.executor.memory", + "spark.executor.cores", + "spark.dynamicAllocation.enabled", + "spark.shuffle.service.enabled", + "spark.shuffle.service.port", + "spark.dynamicAllocation.initialExecutors", + "spark.dynamicAllocation.minExecutors", + "spark.dynamicAllocation.maxExecutors", + "spark.dynamicAllocation.executorAllocationRatio", + "spark.dynamicAllocation.executorIdleTimeout", + "spark.dynamicAllocation.cachedExecutorIdleTimeout", + "spark.dynamicAllocation.shuffleTracking.enabled", + "spark.dynamicAllocation.shuffleTracking.timeout", + "spark.dynamicAllocation.schedulerBacklogTimeout", + "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", + "spark.cleaner.periodicGC.interval", + "flink.execution.target", + "kyuubi.session.engine.flink.max.rows", "kyuubi.metrics.reporters", "kyuubi.metrics.prometheus.port", "kyuubi.session.engine.spark.showProgress", @@ -102,26 +133,26 @@ }, "parameters": [ { - "name": "kyuubi.ha.zookeeper.quorum", + "name": "kyuubi.ha.addresses", "label": "zookeeper服务信息", "description": "zookeeper服务信息", "required": true, "type": "input", - "value": "", + "value": "${zkUrls}", "configurableInWizard": true, "hidden": false, - "defaultValue": "" + "defaultValue": "${zkUrls}" }, { - "name": "kyuubi.ha.zookeeper.namespace", + "name": "kyuubi.ha.namespace", "label": "zookeeper目录", "description": "zookeeper目录", "required": true, "type": "input", - "value": "", + "value": "kyuubi", "configurableInWizard": true, "hidden": false, - "defaultValue": "" + "defaultValue": "kyuubi" }, { "name": "kyuubi.session.idle.timeout", @@ -156,6 +187,204 @@ "hidden": false, "defaultValue": "yarn" }, + { + "name": "spark.submit.deployMode", + "label": "配置spark部署模式", + "description": "配置spark部署模式", + "required": true, + "type": "input", + "value": "cluster", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "cluster" + }, + { + "name": "spark.driver.memory", + "label": "配置spark-driver运行内存", + "description": "配置spark-driver运行内存", + "required": true, + "type": "input", + "value": "2g", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "2g" + }, + { + "name": "spark.executor.memory", + "label": "配置spark-executor运行内存", + "description": "配置spark-executor运行内存", + "required": true, + "type": "input", + "value": "3g", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "3g" + }, + { + "name": "spark.executor.cores", + "label": "配置spark-executor运行核数", + "description": "配置spark-executor运行核数", + "required": true, + "type": "input", + "value": "2", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "2" + }, + { + "name": "spark.shuffle.service.enabled", + "label": "启用spark辅助shuffle服务", + "description": "启用spark辅助shuffle服务", + "required": true, + "type": "switch", + "value": false, + "configurableInWizard": true, + "hidden": false, + "defaultValue": true + }, + { + "name": "spark.shuffle.service.port", + "label": "spark辅助shuffle服务端口", + "description": "spark辅助shuffle服务端口", + "required": false, + "type": "input", + "value": "7337", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "7337" + }, + { + "name": "spark.dynamicAllocation.enabled", + "label": "启用spark动态资源分配", + "description": "启用spark动态资源分配", + "required": true, + "type": "switch", + "value": false, + "configurableInWizard": true, + "hidden": false, + "defaultValue": true + }, + { + "name": "spark.dynamicAllocation.initialExecutors", + "label": "初始executor数量", + "description": "初始executor数量", + "required": false, + "type": "input", + "value": "5", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "5" + }, + { + "name": "spark.dynamicAllocation.minExecutors", + "label": "executor数量下限", + "description": "executor数量下限", + "required": false, + "type": "input", + "value": "5", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "5" + }, + { + "name": "spark.dynamicAllocation.maxExecutors", + "label": "executor数量下限", + "description": "executor数量上限", + "required": false, + "type": "input", + "value": "200", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "200" + }, + { + "name": "spark.dynamicAllocation.executorIdleTimeout", + "label": "executor空闲时间", + "description": "executor空闲时间", + "required": false, + "type": "input", + "value": "60s", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "60s" + }, + { + "name": "spark.dynamicAllocation.cachedExecutorIdleTimeout", + "label": "executor缓存空闲时间", + "description": "executor空闲时间", + "required": false, + "type": "input", + "value": "30min", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "30min" + }, + { + "name": "spark.dynamicAllocation.schedulerBacklogTimeout", + "label": "当task到来时,开始分配executor的时间间隔", + "description": "当task到来时,开始分配executor的时间间隔", + "required": false, + "type": "input", + "value": "1s", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "1s" + }, + { + "name": "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", + "label": "分配executor后,再次申请executor的时间间隔", + "description": "分配executor后,再次申请executor的时间间隔", + "required": false, + "type": "input", + "value": "1s", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "1s" + }, + { + "name": "spark.dynamicAllocation.shuffleTracking.enabled", + "label": "启用作业的 Shuffle 动态分配跟踪", + "description": "启用作业的 Shuffle 动态分配跟踪", + "required": true, + "type": "switch", + "value": false, + "configurableInWizard": true, + "hidden": false, + "defaultValue": false + }, + { + "name": "spark.dynamicAllocation.shuffleTracking.timeout", + "label": "Shuffle 动态分配跟踪的超时时间", + "description": "Shuffle 动态分配跟踪的超时时间", + "required": false, + "type": "input", + "value": "30min", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "30min" + }, + { + "name": "flink.execution.target", + "label": "Flink部署模式", + "description": "Flink部署模式", + "required": true, + "type": "input", + "value": "yarn-session", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "yarn-session" + }, + { + "name": "kyuubi.session.engine.flink.max.rows", + "label": "Flink查询结果最大行数", + "description": "Flink查询结果最大行数", + "required": true, + "type": "input", + "value": "5", + "configurableInWizard": true, + "hidden": false, + "defaultValue": "5" + }, { "name": "kyuubi.metrics.reporters", "label": "监控输出格式", @@ -223,30 +452,6 @@ "hidden": false, "defaultValue": "/usr/local/jdk1.8.0_333" }, - { - "name": "sparkHome", - "label": "spark安装目录", - "description": "spark安装目录", - "configType": "map", - "required": true, - "type": "input", - "value": "/opt/datasophon/spark-3.1.3/", - "configurableInWizard": true, - "hidden": false, - "defaultValue": "/opt/datasophon/spark-3.1.3/" - }, - { - "name": "hadoopConfDir", - "label": "hadoop配置目录", - "description": "hadoop配置目录", - "configType": "map", - "required": true, - "type": "input", - "value": "/opt/datasophon/hadoop/etc/hadoop", - "configurableInWizard": true, - "hidden": false, - "defaultValue": "/opt/datasophon/hadoop/etc/hadoop" - }, { "name": "kyuubiServerHeapSize", "label": "KyuubiServerjvm内存", diff --git a/datasophon-api/src/main/resources/meta/DDP-1.2.2/SEATUNNEL/service_ddl.json b/datasophon-api/src/main/resources/meta/DDP-1.2.2/SEATUNNEL/service_ddl.json index 761ae59e..62bc3237 100644 --- a/datasophon-api/src/main/resources/meta/DDP-1.2.2/SEATUNNEL/service_ddl.json +++ b/datasophon-api/src/main/resources/meta/DDP-1.2.2/SEATUNNEL/service_ddl.json @@ -18,7 +18,7 @@ "type": "replace", "source": "config/seatunnel-env.sh", "regex":"\/opt\/spark", - "replacement": "/opt/datasophon/spark" + "replacement": "/opt/datasophon/spark3" },{ "type": "replace", "source": "config/seatunnel-env.sh", diff --git a/datasophon-common/src/main/java/com/datasophon/common/Constants.java b/datasophon-common/src/main/java/com/datasophon/common/Constants.java index baa13418..49ce731f 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/Constants.java +++ b/datasophon-common/src/main/java/com/datasophon/common/Constants.java @@ -261,4 +261,6 @@ private Constants() { public static final String ROOT = "root"; public static final String DISPATCHER_WORK = "dispatcher-worker.sh"; + + public static final String GRAFANA_PATH = "/grafana"; } diff --git a/datasophon-common/src/main/java/com/datasophon/common/command/GenerateServiceConfigCommand.java b/datasophon-common/src/main/java/com/datasophon/common/command/GenerateServiceConfigCommand.java index 9407ba4b..552f860d 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/command/GenerateServiceConfigCommand.java +++ b/datasophon-common/src/main/java/com/datasophon/common/command/GenerateServiceConfigCommand.java @@ -32,6 +32,8 @@ public class GenerateServiceConfigCommand implements Serializable { private static final long serialVersionUID = -4211566568993105684L; + private Integer clusterId; + private String serviceName; private String decompressPackageName; diff --git a/datasophon-common/src/main/java/com/datasophon/common/model/Generators.java b/datasophon-common/src/main/java/com/datasophon/common/model/Generators.java index 5bc57fd0..c70e53e7 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/model/Generators.java +++ b/datasophon-common/src/main/java/com/datasophon/common/model/Generators.java @@ -34,6 +34,9 @@ public class Generators implements Serializable { private List includeParams; private String templateName; + + private String conditionalOnProperty; + @Override public boolean equals(Object o) { if (this == o) { diff --git a/datasophon-domain/pom.xml b/datasophon-domain/pom.xml index f5d6864b..ebfdf9c4 100644 --- a/datasophon-domain/pom.xml +++ b/datasophon-domain/pom.xml @@ -40,16 +40,6 @@ org.springframework.boot spring-boot-starter-jetty - - - org.eclipse.jetty.websocket - javax-websocket-server-impl - - - org.eclipse.jetty.websocket - websocket-server - - diff --git a/datasophon-service/src/main/java/com/datasophon/api/load/LoadServiceMeta.java b/datasophon-service/src/main/java/com/datasophon/api/load/LoadServiceMeta.java index 50d6748f..cef335b9 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/load/LoadServiceMeta.java +++ b/datasophon-service/src/main/java/com/datasophon/api/load/LoadServiceMeta.java @@ -64,6 +64,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; +import org.springframework.core.env.PropertyResolver; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -82,6 +83,9 @@ public class LoadServiceMeta implements ApplicationRunner { private static final Logger logger = LoggerFactory.getLogger(LoadServiceMeta.class); + @Autowired + private PropertyResolver propertyResolver; + @Autowired private FrameServiceService frameServiceService; @@ -328,7 +332,12 @@ private void buildConfigFileMap( Map map, Map> configFileMap) { ConfigWriter configWriter = serviceInfo.getConfigWriter(); - List generators = configWriter.getGenerators(); + List generators = configWriter.getGenerators().stream().filter(g -> { + if (StringUtils.isNotEmpty(g.getConditionalOnProperty())) { + return propertyResolver.getProperty(g.getConditionalOnProperty(), boolean.class, false); + } + return true; + }).collect(Collectors.toList()); for (Generators generator : generators) { List list = new ArrayList<>(); List includeParams = generator.getIncludeParams(); diff --git a/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java b/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java index d2b85ce2..34751c12 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java +++ b/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java @@ -48,6 +48,7 @@ public ExecResult handlerRequest(ServiceRoleInfo serviceRoleInfo) { execResult.setExecResult(true); // config GenerateServiceConfigCommand generateServiceConfigCommand = new GenerateServiceConfigCommand(); + generateServiceConfigCommand.setClusterId(serviceRoleInfo.getClusterId()); generateServiceConfigCommand.setServiceName(serviceRoleInfo.getParentName()); generateServiceConfigCommand.setCofigFileMap(serviceRoleInfo.getConfigFileMap()); generateServiceConfigCommand.setDecompressPackageName(serviceRoleInfo.getDecompressPackageName()); @@ -62,7 +63,6 @@ public ExecResult handlerRequest(ServiceRoleInfo serviceRoleInfo) { Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); final Future configureFuture = Patterns.ask(configActor, generateServiceConfigCommand, timeout); configureFuture.onComplete(new OnComplete() { - @Override public void onComplete(Throwable failure, Object success) throws Throwable { if (failure != null) { diff --git a/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java b/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java index 39154203..dcfc26f4 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java +++ b/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java @@ -40,6 +40,7 @@ public class ServiceConfigureHandler extends ServiceHandler { public ExecResult handlerRequest(ServiceRoleInfo serviceRoleInfo) throws Exception { // config GenerateServiceConfigCommand generateServiceConfigCommand = new GenerateServiceConfigCommand(); + generateServiceConfigCommand.setClusterId(serviceRoleInfo.getClusterId()); generateServiceConfigCommand.setServiceName(serviceRoleInfo.getParentName()); generateServiceConfigCommand.setCofigFileMap(serviceRoleInfo.getConfigFileMap()); generateServiceConfigCommand.setDecompressPackageName(serviceRoleInfo.getDecompressPackageName()); diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceDashboardService.java b/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceDashboardService.java index a9b3eb29..1a032431 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceDashboardService.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceDashboardService.java @@ -32,4 +32,8 @@ public interface ClusterServiceDashboardService extends IService { Result getDashboardUrl(Integer clusterId); + + String getGrafanaHost(Integer clusterId); + + String getDashboardUrl(Integer clusterId, ClusterServiceDashboard dashboard); } diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterAlertHistoryServiceImpl.java b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterAlertHistoryServiceImpl.java index 86784be3..30bb088b 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterAlertHistoryServiceImpl.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterAlertHistoryServiceImpl.java @@ -76,13 +76,14 @@ public void saveAlertHistory(String alertMessage) { public Result getAlertList(Integer serviceInstanceId) { List list = this.list(new QueryWrapper() .eq(serviceInstanceId != null, Constants.SERVICE_INSTANCE_ID, serviceInstanceId) - .eq(Constants.IS_ENABLED, 1)); + .eq(Constants.IS_ENABLED, 1) + .orderByDesc(Constants.CREATE_TIME)); return Result.success(list); } @Override public Result getAllAlertList(Integer clusterId, Integer page, Integer pageSize) { - Integer offset = (page - 1) * pageSize; + int offset = (page - 1) * pageSize; List list = this.list(new QueryWrapper() .eq(Constants.CLUSTER_ID, clusterId) .eq(Constants.IS_ENABLED, 1) diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceDashboardServiceImpl.java b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceDashboardServiceImpl.java index bb03db5d..ba764532 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceDashboardServiceImpl.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceDashboardServiceImpl.java @@ -17,6 +17,8 @@ package com.datasophon.api.service.impl; +import static com.datasophon.common.Constants.GRAFANA_PATH; + import com.datasophon.api.load.GlobalVariables; import com.datasophon.api.service.ClusterServiceDashboardService; import com.datasophon.common.Constants; @@ -26,9 +28,12 @@ import com.datasophon.dao.mapper.ClusterServiceDashboardMapper; import java.util.Map; +import java.util.Objects; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; @@ -40,16 +45,46 @@ public class ClusterServiceDashboardServiceImpl implements ClusterServiceDashboardService { + @Value("${datasophon.proxy-grafana.enable:false}") + private boolean proxy; + + @Value("${server.servlet.context-path}") + private String contextPath; + @Autowired ClusterServiceDashboardService dashboardService; @Override public Result getDashboardUrl(Integer clusterId) { - Map globalVariables = GlobalVariables.get(clusterId); ClusterServiceDashboard dashboard = dashboardService .getOne(new QueryWrapper().eq(Constants.SERVICE_NAME, "TOTAL")); - String dashboardUrl = PlaceholderUtils.replacePlaceholders(dashboard.getDashboardUrl(), globalVariables, - Constants.REGEX_VARIABLE); - return Result.success(dashboardUrl); + if (Objects.nonNull(dashboard) && StringUtils.hasText(dashboard.getDashboardUrl())) { + return Result.success(getDashboardUrl(clusterId, dashboard)); + } else { + return Result.error("缺少集群总览"); + } + } + + @Override + public String getGrafanaHost(Integer clusterId) { + Map globalVariables = GlobalVariables.get(clusterId); + return globalVariables.get("${grafanaHost}") + ":3000"; + } + + @Override + public String getDashboardUrl(Integer clusterId, ClusterServiceDashboard dashboard) { + String url = dashboard.getDashboardUrl(); + if (proxy) { + // 兼容旧记录 + if (url.startsWith("http://${grafanaHost}:3000")) { + url = url.substring(26); + } + return contextPath + GRAFANA_PATH + "/" + clusterId + url; + } else { + Map globalVariables = GlobalVariables.get(clusterId); + String dashboardUrl = PlaceholderUtils.replacePlaceholders(dashboard.getDashboardUrl(), globalVariables, + Constants.REGEX_VARIABLE); + return dashboardUrl; + } } } diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java index 0c920add..11735ae2 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java @@ -32,7 +32,6 @@ import com.datasophon.common.Constants; import com.datasophon.common.model.SimpleServiceConfig; import com.datasophon.common.utils.CollectionUtils; -import com.datasophon.common.utils.PlaceholderUtils; import com.datasophon.common.utils.Result; import com.datasophon.dao.entity.ClusterAlertHistory; import com.datasophon.dao.entity.ClusterServiceDashboard; @@ -54,8 +53,10 @@ import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.StringUtils; import com.alibaba.fastjson.JSONArray; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -69,6 +70,9 @@ public class ClusterServiceInstanceServiceImpl implements ClusterServiceInstanceService { + @Value("${server.servlet.context-path}") + private String contextPath; + @Autowired private ClusterServiceInstanceMapper serviceInstanceMapper; @@ -123,10 +127,8 @@ public List listAll(Integer clusterId) { // 查询dashboard ClusterServiceDashboard dashboard = dashboardService.getOne(new QueryWrapper() .eq(Constants.SERVICE_NAME, serviceInstance.getServiceName())); - if (Objects.nonNull(dashboard)) { - String dashboardUrl = PlaceholderUtils.replacePlaceholders(dashboard.getDashboardUrl(), globalVariables, - Constants.REGEX_VARIABLE); - serviceInstance.setDashboardUrl(dashboardUrl); + if (Objects.nonNull(dashboard) && StringUtils.hasText(dashboard.getDashboardUrl())) { + serviceInstance.setDashboardUrl(dashboardService.getDashboardUrl(clusterId, dashboard)); } // 查询告警数量 int alertNum = alertHistoryService.count(new QueryWrapper() diff --git a/datasophon-service/src/main/java/com/datasophon/api/strategy/KyuubiServerHandlerStrategy.java b/datasophon-service/src/main/java/com/datasophon/api/strategy/KyuubiServerHandlerStrategy.java index e1674b8a..5f6bcc42 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/strategy/KyuubiServerHandlerStrategy.java +++ b/datasophon-service/src/main/java/com/datasophon/api/strategy/KyuubiServerHandlerStrategy.java @@ -1,89 +1,89 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.datasophon.api.strategy; - -import com.datasophon.api.load.GlobalVariables; -import com.datasophon.api.load.ServiceConfigMap; -import com.datasophon.api.utils.ProcessUtils; -import com.datasophon.common.Constants; -import com.datasophon.common.model.ServiceConfig; -import com.datasophon.common.model.ServiceRoleInfo; -import com.datasophon.dao.entity.ClusterInfoEntity; -import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KyuubiServerHandlerStrategy extends ServiceHandlerAbstract implements ServiceRoleStrategy { - - private static final Logger logger = LoggerFactory.getLogger(KyuubiServerHandlerStrategy.class); - private static final String ENABLE_KERBEROS = "enableKerberos"; - - @Override - public void handler(Integer clusterId, List hosts, String serviceName) { - - } - - @Override - public void handlerConfig(Integer clusterId, List list, String serviceName) { - - Map globalVariables = GlobalVariables.get(clusterId); - boolean enableKerberos = false; - Map map = ProcessUtils.translateToMap(list); - ClusterInfoEntity clusterInfo = ProcessUtils.getClusterInfo(clusterId); - // todo: 判断kerberos的逻辑应该抽取到公共方法中 - for (ServiceConfig config : list) { - if (ENABLE_KERBEROS.equals(config.getName())) { - enableKerberos = - isEnableKerberos( - clusterId, globalVariables, enableKerberos, config, "KYUUBI"); - } - } - String key = clusterInfo.getClusterFrame() + Constants.UNDERLINE + "KYUUBI" + Constants.CONFIG; - List configs = ServiceConfigMap.get(key); - ArrayList kbConfigs = new ArrayList<>(); - if (enableKerberos) { - addConfigWithKerberos(globalVariables, map, configs, kbConfigs); - } else { - removeConfigWithKerberos(list, map, configs); - } - list.addAll(kbConfigs); - } - - @Override - public void getConfig(Integer clusterId, List list) { - - } - - @Override - public void handlerServiceRoleInfo(ServiceRoleInfo serviceRoleInfo, String hostname) { - - } - - @Override - public void handlerServiceRoleCheck(ClusterServiceRoleInstanceEntity roleInstanceEntity, - Map map) { - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.datasophon.api.strategy; + +import com.datasophon.api.load.GlobalVariables; +import com.datasophon.api.load.ServiceConfigMap; +import com.datasophon.api.utils.ProcessUtils; +import com.datasophon.common.Constants; +import com.datasophon.common.model.ServiceConfig; +import com.datasophon.common.model.ServiceRoleInfo; +import com.datasophon.dao.entity.ClusterInfoEntity; +import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KyuubiServerHandlerStrategy extends ServiceHandlerAbstract implements ServiceRoleStrategy { + + private static final Logger logger = LoggerFactory.getLogger(KyuubiServerHandlerStrategy.class); + private static final String ENABLE_KERBEROS = "enableKerberos"; + + @Override + public void handler(Integer clusterId, List hosts, String serviceName) { + + } + + @Override + public void handlerConfig(Integer clusterId, List list, String serviceName) { + + Map globalVariables = GlobalVariables.get(clusterId); + boolean enableKerberos = false; + Map map = ProcessUtils.translateToMap(list); + ClusterInfoEntity clusterInfo = ProcessUtils.getClusterInfo(clusterId); + // todo: 判断kerberos的逻辑应该抽取到公共方法中 + for (ServiceConfig config : list) { + if (ENABLE_KERBEROS.equals(config.getName())) { + enableKerberos = + isEnableKerberos( + clusterId, globalVariables, enableKerberos, config, "KYUUBI"); + } + } + String key = clusterInfo.getClusterFrame() + Constants.UNDERLINE + "KYUUBI" + Constants.CONFIG; + List configs = ServiceConfigMap.get(key); + ArrayList kbConfigs = new ArrayList<>(); + if (enableKerberos) { + addConfigWithKerberos(globalVariables, map, configs, kbConfigs); + } else { + removeConfigWithKerberos(list, map, configs); + } + list.addAll(kbConfigs); + } + + @Override + public void getConfig(Integer clusterId, List list) { + + } + + @Override + public void handlerServiceRoleInfo(ServiceRoleInfo serviceRoleInfo, String hostname) { + + } + + @Override + public void handlerServiceRoleCheck(ClusterServiceRoleInstanceEntity roleInstanceEntity, + Map map) { + } + +} diff --git a/datasophon-service/src/main/java/com/datasophon/api/strategy/ServiceRoleStrategyContext.java b/datasophon-service/src/main/java/com/datasophon/api/strategy/ServiceRoleStrategyContext.java index 85c66331..c3404129 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/strategy/ServiceRoleStrategyContext.java +++ b/datasophon-service/src/main/java/com/datasophon/api/strategy/ServiceRoleStrategyContext.java @@ -1,115 +1,115 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.datasophon.api.strategy; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ServiceRoleStrategyContext { - - private static final Map strategyMap = new ConcurrentHashMap<>(); - - private static final Map serviceNameMap = new ConcurrentHashMap<>(); - - static { - strategyMap.put("NameNode", new NameNodeHandlerStrategy()); - strategyMap.put("ResourceManager", new RMHandlerStrategy()); - strategyMap.put("HiveMetaStore", new HiveMetaStroreHandlerStrategy()); - strategyMap.put("HiveServer2", new HiveServer2HandlerStrategy()); - strategyMap.put("Grafana", new GrafanaHandlerStrategy()); - strategyMap.put("ZkServer", new ZkServerHandlerStrategy()); - strategyMap.put("HistoryServer", new HistoryServerHandlerStrategy()); - strategyMap.put("TimelineServer", new TSHandlerStrategy()); - strategyMap.put("TrinoCoordinator", new TrinoHandlerStrategy()); - strategyMap.put("JournalNode", new JournalNodeHandlerStrategy()); - strategyMap.put("ZKFC", new ZKFCHandlerStrategy()); - strategyMap.put("HttpFs", new HttpFsHandlerStrategy()); - strategyMap.put("SRFE", new FEHandlerStartegy()); - strategyMap.put("DorisFE", new FEHandlerStartegy()); - strategyMap.put("DorisFEObserver", new FEObserverHandlerStartegy()); - strategyMap.put("SRBE", new BEHandlerStartegy()); - strategyMap.put("DorisBE", new BEHandlerStartegy()); - strategyMap.put("Krb5Kdc", new Krb5KdcHandlerStrategy()); - strategyMap.put("KAdmin", new KAdminHandlerStrategy()); - strategyMap.put("RangerAdmin", new RangerAdminHandlerStrategy()); - strategyMap.put("ElasticSearch", new ElasticSearchHandlerStrategy()); - strategyMap.put("Prometheus", new PrometheusHandlerStrategy()); - strategyMap.put("AlertManager", new AlertManagerHandlerStrategy()); - - strategyMap.put("RANGER", new RangerAdminHandlerStrategy()); - strategyMap.put("ZOOKEEPER", new ZkServerHandlerStrategy()); - strategyMap.put("YARN", new RMHandlerStrategy()); - strategyMap.put("HDFS", new NameNodeHandlerStrategy()); - strategyMap.put("HIVE", new HiveServer2HandlerStrategy()); - strategyMap.put("KAFKA", new KafkaHandlerStrategy()); - strategyMap.put("HBASE", new HBaseHandlerStrategy()); - strategyMap.put("FLINK", new FlinkHandlerStrategy()); - strategyMap.put("KYUUBI", new KyuubiServerHandlerStrategy()); - - // serviceNameMap - serviceNameMap.put("NameNode", "HDFS"); - serviceNameMap.put("ResourceManager", "YARN"); - serviceNameMap.put("HiveMetaStore", "HIVE"); - serviceNameMap.put("HiveServer2", "HIVE"); - serviceNameMap.put("Grafana", "GRAFANA"); - serviceNameMap.put("ZkServer", "ZOOKEEPER"); - serviceNameMap.put("HistoryServer", "YARN"); - serviceNameMap.put("TimelineServer", "YARN"); - serviceNameMap.put("TrinoCoordinator", "TRINO"); - serviceNameMap.put("JournalNode", "HDFS"); - serviceNameMap.put("ZKFC", "HDFS"); - serviceNameMap.put("HttpFs", "HDFS"); - serviceNameMap.put("SRFE", "STARROCKS"); - serviceNameMap.put("DorisFE", "DORIS"); - serviceNameMap.put("DorisFEObserver", "DORIS"); - serviceNameMap.put("SRBE", "STARROCKS"); - serviceNameMap.put("DorisBE", "DORIS"); - serviceNameMap.put("Krb5Kdc", "KERBEROS"); - serviceNameMap.put("KAdmin", "KERBEROS"); - serviceNameMap.put("RangerAdmin", "RANGER"); - serviceNameMap.put("ElasticSearch", "ELASTICSEARCH"); - serviceNameMap.put("Prometheus", "PROMETHEUS"); - serviceNameMap.put("AlertManager", "ALERTMANAGER"); - - serviceNameMap.put("FLINK", "FLINK"); - serviceNameMap.put("RANGER", "RANGER"); - serviceNameMap.put("YARN", "YARN"); - serviceNameMap.put("HDFS", "HDFS"); - serviceNameMap.put("HIVE", "HIVE"); - serviceNameMap.put("KAFKA", "KAFKA"); - serviceNameMap.put("HBASE", "HBASE"); - serviceNameMap.put("KYUUBI", "KYUUBI"); - - } - - public static ServiceRoleStrategy getServiceRoleHandler(String type) { - if (StringUtils.isBlank(type)) { - return null; - } - return strategyMap.get(type); - } - - public static String getServiceName(String type) { - if (StringUtils.isBlank(type)) { - return null; - } - return serviceNameMap.get(type); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.api.strategy; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ServiceRoleStrategyContext { + + private static final Map strategyMap = new ConcurrentHashMap<>(); + + private static final Map serviceNameMap = new ConcurrentHashMap<>(); + + static { + strategyMap.put("NameNode", new NameNodeHandlerStrategy()); + strategyMap.put("ResourceManager", new RMHandlerStrategy()); + strategyMap.put("HiveMetaStore", new HiveMetaStroreHandlerStrategy()); + strategyMap.put("HiveServer2", new HiveServer2HandlerStrategy()); + strategyMap.put("Grafana", new GrafanaHandlerStrategy()); + strategyMap.put("ZkServer", new ZkServerHandlerStrategy()); + strategyMap.put("HistoryServer", new HistoryServerHandlerStrategy()); + strategyMap.put("TimelineServer", new TSHandlerStrategy()); + strategyMap.put("TrinoCoordinator", new TrinoHandlerStrategy()); + strategyMap.put("JournalNode", new JournalNodeHandlerStrategy()); + strategyMap.put("ZKFC", new ZKFCHandlerStrategy()); + strategyMap.put("HttpFs", new HttpFsHandlerStrategy()); + strategyMap.put("SRFE", new FEHandlerStartegy()); + strategyMap.put("DorisFE", new FEHandlerStartegy()); + strategyMap.put("DorisFEObserver", new FEObserverHandlerStartegy()); + strategyMap.put("SRBE", new BEHandlerStartegy()); + strategyMap.put("DorisBE", new BEHandlerStartegy()); + strategyMap.put("Krb5Kdc", new Krb5KdcHandlerStrategy()); + strategyMap.put("KAdmin", new KAdminHandlerStrategy()); + strategyMap.put("RangerAdmin", new RangerAdminHandlerStrategy()); + strategyMap.put("ElasticSearch", new ElasticSearchHandlerStrategy()); + strategyMap.put("Prometheus", new PrometheusHandlerStrategy()); + strategyMap.put("AlertManager", new AlertManagerHandlerStrategy()); + + strategyMap.put("RANGER", new RangerAdminHandlerStrategy()); + strategyMap.put("ZOOKEEPER", new ZkServerHandlerStrategy()); + strategyMap.put("YARN", new RMHandlerStrategy()); + strategyMap.put("HDFS", new NameNodeHandlerStrategy()); + strategyMap.put("HIVE", new HiveServer2HandlerStrategy()); + strategyMap.put("KAFKA", new KafkaHandlerStrategy()); + strategyMap.put("HBASE", new HBaseHandlerStrategy()); + strategyMap.put("FLINK", new FlinkHandlerStrategy()); + strategyMap.put("KYUUBI", new KyuubiServerHandlerStrategy()); + + // serviceNameMap + serviceNameMap.put("NameNode", "HDFS"); + serviceNameMap.put("ResourceManager", "YARN"); + serviceNameMap.put("HiveMetaStore", "HIVE"); + serviceNameMap.put("HiveServer2", "HIVE"); + serviceNameMap.put("Grafana", "GRAFANA"); + serviceNameMap.put("ZkServer", "ZOOKEEPER"); + serviceNameMap.put("HistoryServer", "YARN"); + serviceNameMap.put("TimelineServer", "YARN"); + serviceNameMap.put("TrinoCoordinator", "TRINO"); + serviceNameMap.put("JournalNode", "HDFS"); + serviceNameMap.put("ZKFC", "HDFS"); + serviceNameMap.put("HttpFs", "HDFS"); + serviceNameMap.put("SRFE", "STARROCKS"); + serviceNameMap.put("DorisFE", "DORIS"); + serviceNameMap.put("DorisFEObserver", "DORIS"); + serviceNameMap.put("SRBE", "STARROCKS"); + serviceNameMap.put("DorisBE", "DORIS"); + serviceNameMap.put("Krb5Kdc", "KERBEROS"); + serviceNameMap.put("KAdmin", "KERBEROS"); + serviceNameMap.put("RangerAdmin", "RANGER"); + serviceNameMap.put("ElasticSearch", "ELASTICSEARCH"); + serviceNameMap.put("Prometheus", "PROMETHEUS"); + serviceNameMap.put("AlertManager", "ALERTMANAGER"); + + serviceNameMap.put("FLINK", "FLINK"); + serviceNameMap.put("RANGER", "RANGER"); + serviceNameMap.put("YARN", "YARN"); + serviceNameMap.put("HDFS", "HDFS"); + serviceNameMap.put("HIVE", "HIVE"); + serviceNameMap.put("KAFKA", "KAFKA"); + serviceNameMap.put("HBASE", "HBASE"); + serviceNameMap.put("KYUUBI", "KYUUBI"); + + } + + public static ServiceRoleStrategy getServiceRoleHandler(String type) { + if (StringUtils.isBlank(type)) { + return null; + } + return strategyMap.get(type); + } + + public static String getServiceName(String type) { + if (StringUtils.isBlank(type)) { + return null; + } + return serviceNameMap.get(type); + } +} diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/actor/ConfigureServiceActor.java b/datasophon-worker/src/main/java/com/datasophon/worker/actor/ConfigureServiceActor.java index 041b20a3..14713f96 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/actor/ConfigureServiceActor.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/actor/ConfigureServiceActor.java @@ -40,6 +40,7 @@ public void onReceive(Object msg) throws Throwable { new ConfigureServiceHandler(command.getServiceName(), command.getServiceRoleName()); ExecResult startResult = serviceHandler.configure(command.getCofigFileMap(), command.getDecompressPackageName(), + command.getClusterId(), command.getMyid(), command.getServiceRoleName(), command.getRunAs()); diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java index 2f441b23..9d0d295c 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java @@ -72,6 +72,7 @@ public ConfigureServiceHandler(String serviceName, String serviceRoleName) { public ExecResult configure(Map> cofigFileMap, String decompressPackageName, + Integer clusterId, Integer myid, String serviceRoleName, RunAs runAs) { @@ -79,10 +80,13 @@ public ExecResult configure(Map> cofigFileMap, try { String hostName = InetAddress.getLocalHost().getHostName(); + String ip = InetAddress.getLocalHost().getHostAddress(); HashMap paramMap = new HashMap<>(); + paramMap.put("${clusterId}", String.valueOf(clusterId)); paramMap.put("${host}", hostName); + paramMap.put("${ip}", ip); paramMap.put("${user}", "root"); - paramMap.put("${myid}", myid + ""); + paramMap.put("${myid}", String.valueOf(myid)); logger.info("Start to configure service role {}", serviceRoleName); for (Generators generators : cofigFileMap.keySet()) { List configs = cofigFileMap.get(generators); @@ -164,6 +168,13 @@ public ExecResult configure(Map> cofigFileMap, serviceConfig.setValue(IdUtil.simpleUUID()); customConfList.add(serviceConfig); } + if ("Grafana".equals(serviceRoleName)) { + ServiceConfig clusterIdConfig = new ServiceConfig(); + clusterIdConfig.setName("clusterId"); + clusterIdConfig.setValue(String.valueOf(clusterId)); + clusterIdConfig.setConfigType("map"); + customConfList.add(clusterIdConfig); + } configs.addAll(customConfList); if (!configs.isEmpty()) { // extra app, package: META, templates diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/KyuubiServerHandlerStrategy.java b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/KyuubiServerHandlerStrategy.java index e307788f..f5f0171d 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/KyuubiServerHandlerStrategy.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/KyuubiServerHandlerStrategy.java @@ -1,61 +1,61 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.datasophon.worker.strategy; - -import com.datasophon.common.Constants; -import com.datasophon.common.cache.CacheUtils; -import com.datasophon.common.command.ServiceRoleOperateCommand; -import com.datasophon.common.utils.ExecResult; -import com.datasophon.worker.handler.ServiceHandler; -import com.datasophon.worker.utils.KerberosUtils; - -import java.sql.SQLException; - -import cn.hutool.core.io.FileUtil; - -public class KyuubiServerHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy { - - private static final String KEYTAB_NAME = "kyuubi.service.keytab"; - private static final String KEYTAB_PATH = "/etc/security/keytab/" + KEYTAB_NAME; - - public KyuubiServerHandlerStrategy(String serviceName, String serviceRoleName) { - super(serviceName, serviceRoleName); - } - - @Override - public ExecResult handler(ServiceRoleOperateCommand command) throws SQLException, ClassNotFoundException { - ExecResult startResult; - if (command.getEnableKerberos()) { - logger.info("start to get kyuubi keytab file"); - String hostname = CacheUtils.getString(Constants.HOSTNAME); - KerberosUtils.createKeytabDir(); - if (!FileUtil.exist(KEYTAB_PATH)) { - KerberosUtils.downloadKeytabFromMaster("kyuubi/" + hostname, KEYTAB_NAME); - } - } - ServiceHandler serviceHandler = new ServiceHandler( - command.getServiceName(), - command.getServiceRoleName()); - startResult = serviceHandler.start( - command.getStartRunner(), - command.getStatusRunner(), - command.getDecompressPackageName(), - command.getRunAs()); - return startResult; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.worker.strategy; + +import com.datasophon.common.Constants; +import com.datasophon.common.cache.CacheUtils; +import com.datasophon.common.command.ServiceRoleOperateCommand; +import com.datasophon.common.utils.ExecResult; +import com.datasophon.worker.handler.ServiceHandler; +import com.datasophon.worker.utils.KerberosUtils; + +import java.sql.SQLException; + +import cn.hutool.core.io.FileUtil; + +public class KyuubiServerHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy { + + private static final String KEYTAB_NAME = "kyuubi.service.keytab"; + private static final String KEYTAB_PATH = "/etc/security/keytab/" + KEYTAB_NAME; + + public KyuubiServerHandlerStrategy(String serviceName, String serviceRoleName) { + super(serviceName, serviceRoleName); + } + + @Override + public ExecResult handler(ServiceRoleOperateCommand command) throws SQLException, ClassNotFoundException { + ExecResult startResult; + if (command.getEnableKerberos()) { + logger.info("start to get kyuubi keytab file"); + String hostname = CacheUtils.getString(Constants.HOSTNAME); + KerberosUtils.createKeytabDir(); + if (!FileUtil.exist(KEYTAB_PATH)) { + KerberosUtils.downloadKeytabFromMaster("kyuubi/" + hostname, KEYTAB_NAME); + } + } + ServiceHandler serviceHandler = new ServiceHandler( + command.getServiceName(), + command.getServiceRoleName()); + startResult = serviceHandler.start( + command.getStartRunner(), + command.getStatusRunner(), + command.getDecompressPackageName(), + command.getRunAs()); + return startResult; + } +} diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/ServiceRoleStrategyContext.java b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/ServiceRoleStrategyContext.java index 3788eeab..6b6a1774 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/ServiceRoleStrategyContext.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/ServiceRoleStrategyContext.java @@ -1,68 +1,68 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.datasophon.worker.strategy; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ServiceRoleStrategyContext { - - private static final Map map = new ConcurrentHashMap<>(); - - static { - map.put("NameNode", new NameNodeHandlerStrategy("HDFS", "NameNode")); - map.put("ZKFC", new ZKFCHandlerStrategy("HDFS", "ZKFC")); - map.put("JournalNode", new JournalNodeHandlerStrategy("HDFS", "JournalNode")); - map.put("DataNode", new DataNodeHandlerStrategy("HDFS", "DataNode")); - map.put("ResourceManager", new ResourceManagerHandlerStrategy("YARN", "ResourceManager")); - map.put("NodeManager", new NodeManagerHandlerStrategy("YARN", "NodeManager")); - map.put("RangerAdmin", new RangerAdminHandlerStrategy("RANGER", "RangerAdmin")); - map.put("HiveServer2", new HiveServer2HandlerStrategy("HIVE", "HiveServer2")); - map.put("HbaseMaster", new HbaseHandlerStrategy("HBASE", "HbaseMaster")); - map.put("RegionServer", new HbaseHandlerStrategy("HBASE", "RegionServer")); - map.put("Krb5Kdc", new Krb5KdcHandlerStrategy("KERBEROS", "Krb5Kdc")); - map.put("KAdmin", new KAdminHandlerStrategy("KERBEROS", "KAdmin")); - map.put("SRFE", new FEHandlerStrategy("STARROCKS", "SRFE")); - map.put("DorisFE", new FEHandlerStrategy("DORIS", "DorisFE")); - map.put("DorisFEObserver", new FEObserverHandlerStrategy("DORIS", "DorisFEObserver")); - map.put("ZkServer", new ZkServerHandlerStrategy("ZOOKEEPER", "ZkServer")); - map.put("KafkaBroker", new KafkaHandlerStrategy("KAFKA", "KafkaBroker")); - map.put("SRBE", new BEHandlerStrategy("STARROCKS", "SRBE")); - map.put("DorisBE", new BEHandlerStrategy("DORIS", "DorisBE")); - map.put("HistoryServer", new HistoryServerHandlerStrategy("YARN", "HistoryServer")); - - // TEZ Server service - map.put("TezServer", new TezServerHandlerStrategy("TEZ", "TezServer")); - // kyuubi - map.put("KyuubiServer", new KyuubiServerHandlerStrategy("KYUUBI", "KyuubiServer")); - // flink - map.put("FlinkClient", new FlinkHandlerStrategy("FLINK", "FlinkClient")); - - // DolphinScheduler - map.put("MasterServer", new DSMasterHandlerStrategy("DS", "MasterServer")); - } - - public static ServiceRoleStrategy getServiceRoleHandler(String type) { - if (StringUtils.isBlank(type)) { - return null; - } - return map.get(type); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.worker.strategy; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ServiceRoleStrategyContext { + + private static final Map map = new ConcurrentHashMap<>(); + + static { + map.put("NameNode", new NameNodeHandlerStrategy("HDFS", "NameNode")); + map.put("ZKFC", new ZKFCHandlerStrategy("HDFS", "ZKFC")); + map.put("JournalNode", new JournalNodeHandlerStrategy("HDFS", "JournalNode")); + map.put("DataNode", new DataNodeHandlerStrategy("HDFS", "DataNode")); + map.put("ResourceManager", new ResourceManagerHandlerStrategy("YARN", "ResourceManager")); + map.put("NodeManager", new NodeManagerHandlerStrategy("YARN", "NodeManager")); + map.put("RangerAdmin", new RangerAdminHandlerStrategy("RANGER", "RangerAdmin")); + map.put("HiveServer2", new HiveServer2HandlerStrategy("HIVE", "HiveServer2")); + map.put("HbaseMaster", new HbaseHandlerStrategy("HBASE", "HbaseMaster")); + map.put("RegionServer", new HbaseHandlerStrategy("HBASE", "RegionServer")); + map.put("Krb5Kdc", new Krb5KdcHandlerStrategy("KERBEROS", "Krb5Kdc")); + map.put("KAdmin", new KAdminHandlerStrategy("KERBEROS", "KAdmin")); + map.put("SRFE", new FEHandlerStrategy("STARROCKS", "SRFE")); + map.put("DorisFE", new FEHandlerStrategy("DORIS", "DorisFE")); + map.put("DorisFEObserver", new FEObserverHandlerStrategy("DORIS", "DorisFEObserver")); + map.put("ZkServer", new ZkServerHandlerStrategy("ZOOKEEPER", "ZkServer")); + map.put("KafkaBroker", new KafkaHandlerStrategy("KAFKA", "KafkaBroker")); + map.put("SRBE", new BEHandlerStrategy("STARROCKS", "SRBE")); + map.put("DorisBE", new BEHandlerStrategy("DORIS", "DorisBE")); + map.put("HistoryServer", new HistoryServerHandlerStrategy("YARN", "HistoryServer")); + + // TEZ Server service + map.put("TezServer", new TezServerHandlerStrategy("TEZ", "TezServer")); + // kyuubi + map.put("KyuubiServer", new KyuubiServerHandlerStrategy("KYUUBI", "KyuubiServer")); + // flink + map.put("FlinkClient", new FlinkHandlerStrategy("FLINK", "FlinkClient")); + + // DolphinScheduler + map.put("MasterServer", new DSMasterHandlerStrategy("DS", "MasterServer")); + } + + public static ServiceRoleStrategy getServiceRoleHandler(String type) { + if (StringUtils.isBlank(type)) { + return null; + } + return map.get(type); + } +} diff --git a/datasophon-worker/src/main/resources/script/datasophon-env.sh b/datasophon-worker/src/main/resources/script/datasophon-env.sh index 7c6c421a..723e8def 100644 --- a/datasophon-worker/src/main/resources/script/datasophon-env.sh +++ b/datasophon-worker/src/main/resources/script/datasophon-env.sh @@ -3,7 +3,7 @@ CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME CLASSPATH export KYUUBI_HOME=/opt/datasophon/kyuubi -export SPARK_HOME=/opt/datasophon/spark +export SPARK_HOME=/opt/datasophon/spark3 export PYSPARK_ALLOW_INSECURE_GATEWAY=1 export HIVE_HOME=/opt/datasophon/hive export KAFKA_HOME=/opt/datasophon/kafka diff --git a/datasophon-worker/src/main/resources/templates/grafana.ftl b/datasophon-worker/src/main/resources/templates/grafana.ftl new file mode 100644 index 00000000..0a6a3341 --- /dev/null +++ b/datasophon-worker/src/main/resources/templates/grafana.ftl @@ -0,0 +1,9 @@ +[server] +root_url = http://0.0.0.0:3000/ddh/grafana/${clusterId} +serve_from_sub_path = true + +[security] +allow_embedding = true + +[live] +allowed_origins = * \ No newline at end of file diff --git a/datasophon-worker/src/main/resources/templates/kyuubi-env.ftl b/datasophon-worker/src/main/resources/templates/kyuubi-env.ftl index 670a9e1e..16da6555 100644 --- a/datasophon-worker/src/main/resources/templates/kyuubi-env.ftl +++ b/datasophon-worker/src/main/resources/templates/kyuubi-env.ftl @@ -61,11 +61,12 @@ export KYUUBI_BEELINE_OPTS="-Xmx${kyuubiClientHeapSize}g -XX:+UnlockDiagnosticVM #jdk export JAVA_HOME=${javaHome} #spark engine -export SPARK_HOME=${sparkHome} +export SPARK_HOME=/opt/datasophon/spark3/ +export FLINK_HOME=/opt/datasophon/flink/ #hadoop config -export HADOOP_CONF_DIR=${hadoopConfDir} -export YARN_CONF_DIR=${hadoopConfDir} +export HADOOP_CONF_DIR=/opt/datasophon/hadoop/etc/hadoop +export YARN_CONF_DIR=/opt/datasophon/hadoop/etc/hadoop # customer env <#list itemList as item>