diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 81bbf84b34..92eb808356 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -23,6 +23,7 @@ import org.dinky.config.Dialect; import org.dinky.context.TenantContextHolder; import org.dinky.data.annotations.ProcessStep; +import org.dinky.data.app.AppParamConfig; import org.dinky.data.constant.CommonConstant; import org.dinky.data.dto.AbstractStatementDTO; import org.dinky.data.dto.DebugDTO; @@ -95,6 +96,7 @@ import java.io.Reader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Objects; @@ -153,15 +155,17 @@ public class TaskServiceImpl extends SuperServiceImpl implemen private CatalogueService catalogueService; private String[] buildParams(int id) { - return String.format( - "--id %d --driver %s --url %s --username %s --password %s --dinkyAddr %s", - id, - dsProperties.getDriverClassName(), - dsProperties.getUrl(), - dsProperties.getUsername(), - dsProperties.getPassword(), - SystemConfiguration.getInstances().getDinkyAddr()) - .split(" "); + AppParamConfig appParamConfig = AppParamConfig.builder() + .taskId(id) + .url(dsProperties.getUrl()) + .username(dsProperties.getUsername()) + .password(dsProperties.getPassword()) + .dinkyAddr(SystemConfiguration.getInstances().getDinkyAddr().getValue()) + .split(SystemConfiguration.getInstances().getSqlSeparator()) + .build(); + String encodeParam = Base64.getEncoder() + .encodeToString(JsonUtils.toJsonString(appParamConfig).getBytes()); + return StrFormatter.format("--config {}", encodeParam).split(" "); } @ProcessStep(type = ProcessStepType.SUBMIT_PRECHECK) @@ -217,6 +221,7 @@ public JobConfig buildJobConfig(TaskDTO task) { FlinkClusterConfig flinkClusterCfg = clusterCfgService.getFlinkClusterCfg(config.getClusterConfigurationId()); flinkClusterCfg.getAppConfig().setUserJarParas(buildParams(config.getTaskId())); + flinkClusterCfg.getAppConfig().setUserJarMainAppClass(CommonConstant.DINKY_APP_MAIN_CLASS); config.buildGatewayConfig(flinkClusterCfg); } else { log.info("Init remote cluster"); diff --git a/dinky-app/dinky-app-1.13/src/main/java/org/dinky/app/MainApp.java b/dinky-app/dinky-app-1.13/src/main/java/org/dinky/app/MainApp.java index aae2b57f22..fbcc98e3e5 100644 --- a/dinky-app/dinky-app-1.13/src/main/java/org/dinky/app/MainApp.java +++ b/dinky-app/dinky-app-1.13/src/main/java/org/dinky/app/MainApp.java @@ -19,27 +19,42 @@ package org.dinky.app; -import org.dinky.app.db.DBConfig; +import org.dinky.app.constant.AppParamConstant; +import org.dinky.app.db.DBUtil; import org.dinky.app.flinksql.Submitter; -import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkParamConstant; -import org.dinky.utils.FlinkBaseUtil; +import org.dinky.data.app.AppParamConfig; +import org.dinky.utils.JsonUtils; -import java.io.IOException; -import java.util.Map; +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.Base64; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MainApp * - * @since 2021/10/27 + * @since 2022/11/05 */ public class MainApp { - public static void main(String[] args) throws IOException { - Map params = FlinkBaseUtil.getParamsFromArgs(args); - String id = params.get(FlinkParamConstant.ID); - Asserts.checkNullString(id, "请配置入参 id "); - DBConfig dbConfig = DBConfig.build(params); - Submitter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR)); + private static final Logger log = LoggerFactory.getLogger(Submitter.class); + + public static void main(String[] args) throws Exception { + log.info("=========================Start run dinky app job==============================="); + ParameterTool parameters = ParameterTool.fromArgs(args); + boolean isEncrypt = parameters.getBoolean(AppParamConstant.isEncrypt, true); + String config = parameters.get(AppParamConstant.config); + config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config; + AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class); + try { + log.info("dinky app is Ready to run, config is {}", appConfig); + DBUtil.init(appConfig); + Submitter.submit(appConfig); + } catch (Exception e) { + log.error("exectue app failed with config: {}", appConfig); + throw e; + } } } diff --git a/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java b/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java index aae2b57f22..fbcc98e3e5 100644 --- a/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java +++ b/dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java @@ -19,27 +19,42 @@ package org.dinky.app; -import org.dinky.app.db.DBConfig; +import org.dinky.app.constant.AppParamConstant; +import org.dinky.app.db.DBUtil; import org.dinky.app.flinksql.Submitter; -import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkParamConstant; -import org.dinky.utils.FlinkBaseUtil; +import org.dinky.data.app.AppParamConfig; +import org.dinky.utils.JsonUtils; -import java.io.IOException; -import java.util.Map; +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.Base64; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MainApp * - * @since 2021/10/27 + * @since 2022/11/05 */ public class MainApp { - public static void main(String[] args) throws IOException { - Map params = FlinkBaseUtil.getParamsFromArgs(args); - String id = params.get(FlinkParamConstant.ID); - Asserts.checkNullString(id, "请配置入参 id "); - DBConfig dbConfig = DBConfig.build(params); - Submitter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR)); + private static final Logger log = LoggerFactory.getLogger(Submitter.class); + + public static void main(String[] args) throws Exception { + log.info("=========================Start run dinky app job==============================="); + ParameterTool parameters = ParameterTool.fromArgs(args); + boolean isEncrypt = parameters.getBoolean(AppParamConstant.isEncrypt, true); + String config = parameters.get(AppParamConstant.config); + config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config; + AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class); + try { + log.info("dinky app is Ready to run, config is {}", appConfig); + DBUtil.init(appConfig); + Submitter.submit(appConfig); + } catch (Exception e) { + log.error("exectue app failed with config: {}", appConfig); + throw e; + } } } diff --git a/dinky-app/dinky-app-1.15/src/main/java/org/dinky/app/MainApp.java b/dinky-app/dinky-app-1.15/src/main/java/org/dinky/app/MainApp.java index aae2b57f22..fbcc98e3e5 100644 --- a/dinky-app/dinky-app-1.15/src/main/java/org/dinky/app/MainApp.java +++ b/dinky-app/dinky-app-1.15/src/main/java/org/dinky/app/MainApp.java @@ -19,27 +19,42 @@ package org.dinky.app; -import org.dinky.app.db.DBConfig; +import org.dinky.app.constant.AppParamConstant; +import org.dinky.app.db.DBUtil; import org.dinky.app.flinksql.Submitter; -import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkParamConstant; -import org.dinky.utils.FlinkBaseUtil; +import org.dinky.data.app.AppParamConfig; +import org.dinky.utils.JsonUtils; -import java.io.IOException; -import java.util.Map; +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.Base64; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MainApp * - * @since 2021/10/27 + * @since 2022/11/05 */ public class MainApp { - public static void main(String[] args) throws IOException { - Map params = FlinkBaseUtil.getParamsFromArgs(args); - String id = params.get(FlinkParamConstant.ID); - Asserts.checkNullString(id, "请配置入参 id "); - DBConfig dbConfig = DBConfig.build(params); - Submitter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR)); + private static final Logger log = LoggerFactory.getLogger(Submitter.class); + + public static void main(String[] args) throws Exception { + log.info("=========================Start run dinky app job==============================="); + ParameterTool parameters = ParameterTool.fromArgs(args); + boolean isEncrypt = parameters.getBoolean(AppParamConstant.isEncrypt, true); + String config = parameters.get(AppParamConstant.config); + config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config; + AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class); + try { + log.info("dinky app is Ready to run, config is {}", appConfig); + DBUtil.init(appConfig); + Submitter.submit(appConfig); + } catch (Exception e) { + log.error("exectue app failed with config: {}", appConfig); + throw e; + } } } diff --git a/dinky-app/dinky-app-1.16/src/main/java/org/dinky/app/MainApp.java b/dinky-app/dinky-app-1.16/src/main/java/org/dinky/app/MainApp.java index ec5fe22d5d..fbcc98e3e5 100644 --- a/dinky-app/dinky-app-1.16/src/main/java/org/dinky/app/MainApp.java +++ b/dinky-app/dinky-app-1.16/src/main/java/org/dinky/app/MainApp.java @@ -19,14 +19,18 @@ package org.dinky.app; -import org.dinky.app.db.DBConfig; +import org.dinky.app.constant.AppParamConstant; +import org.dinky.app.db.DBUtil; import org.dinky.app.flinksql.Submitter; -import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkParamConstant; -import org.dinky.utils.FlinkBaseUtil; +import org.dinky.data.app.AppParamConfig; +import org.dinky.utils.JsonUtils; -import java.io.IOException; -import java.util.Map; +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.Base64; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MainApp @@ -35,11 +39,22 @@ */ public class MainApp { - public static void main(String[] args) throws IOException { - Map params = FlinkBaseUtil.getParamsFromArgs(args); - String id = params.get(FlinkParamConstant.ID); - Asserts.checkNullString(id, "请配置入参 id "); - DBConfig dbConfig = DBConfig.build(params); - Submitter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR)); + private static final Logger log = LoggerFactory.getLogger(Submitter.class); + + public static void main(String[] args) throws Exception { + log.info("=========================Start run dinky app job==============================="); + ParameterTool parameters = ParameterTool.fromArgs(args); + boolean isEncrypt = parameters.getBoolean(AppParamConstant.isEncrypt, true); + String config = parameters.get(AppParamConstant.config); + config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config; + AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class); + try { + log.info("dinky app is Ready to run, config is {}", appConfig); + DBUtil.init(appConfig); + Submitter.submit(appConfig); + } catch (Exception e) { + log.error("exectue app failed with config: {}", appConfig); + throw e; + } } } diff --git a/dinky-app/dinky-app-1.17/src/main/java/org/dinky/app/MainApp.java b/dinky-app/dinky-app-1.17/src/main/java/org/dinky/app/MainApp.java index ec5fe22d5d..fbcc98e3e5 100644 --- a/dinky-app/dinky-app-1.17/src/main/java/org/dinky/app/MainApp.java +++ b/dinky-app/dinky-app-1.17/src/main/java/org/dinky/app/MainApp.java @@ -19,14 +19,18 @@ package org.dinky.app; -import org.dinky.app.db.DBConfig; +import org.dinky.app.constant.AppParamConstant; +import org.dinky.app.db.DBUtil; import org.dinky.app.flinksql.Submitter; -import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkParamConstant; -import org.dinky.utils.FlinkBaseUtil; +import org.dinky.data.app.AppParamConfig; +import org.dinky.utils.JsonUtils; -import java.io.IOException; -import java.util.Map; +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.Base64; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MainApp @@ -35,11 +39,22 @@ */ public class MainApp { - public static void main(String[] args) throws IOException { - Map params = FlinkBaseUtil.getParamsFromArgs(args); - String id = params.get(FlinkParamConstant.ID); - Asserts.checkNullString(id, "请配置入参 id "); - DBConfig dbConfig = DBConfig.build(params); - Submitter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR)); + private static final Logger log = LoggerFactory.getLogger(Submitter.class); + + public static void main(String[] args) throws Exception { + log.info("=========================Start run dinky app job==============================="); + ParameterTool parameters = ParameterTool.fromArgs(args); + boolean isEncrypt = parameters.getBoolean(AppParamConstant.isEncrypt, true); + String config = parameters.get(AppParamConstant.config); + config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config; + AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class); + try { + log.info("dinky app is Ready to run, config is {}", appConfig); + DBUtil.init(appConfig); + Submitter.submit(appConfig); + } catch (Exception e) { + log.error("exectue app failed with config: {}", appConfig); + throw e; + } } } diff --git a/dinky-app/dinky-app-base/pom.xml b/dinky-app/dinky-app-base/pom.xml index bf54a3b48d..50008536d4 100644 --- a/dinky-app/dinky-app-base/pom.xml +++ b/dinky-app/dinky-app-base/pom.xml @@ -44,6 +44,10 @@ lombok provided + + cn.hutool + hutool-all + diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/constant/AppParamConstant.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/constant/AppParamConstant.java new file mode 100644 index 0000000000..7eb9db9221 --- /dev/null +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/constant/AppParamConstant.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.app.constant; + +public class AppParamConstant { + public static final String config = "config"; + public static final String isEncrypt = "encrypt"; +} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBConfig.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBConfig.java deleted file mode 100644 index f49ae807f1..0000000000 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBConfig.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.app.db; - -import org.dinky.constant.FlinkParamConstant; - -import java.util.Map; - -/** - * DBConfig - * - * @since 2021/10/27 - */ -public class DBConfig { - - private String driver; - private String url; - private String username; - private String password; - - public DBConfig(String driver, String url, String username, String password) { - this.driver = driver; - this.url = url; - this.username = username; - this.password = password; - } - - public static DBConfig build(String driver, String url, String username, String password) { - return new DBConfig(driver, url, username, password); - } - - public static DBConfig build(Map params) { - return new DBConfig( - params.get(FlinkParamConstant.DRIVER), - params.get(FlinkParamConstant.URL), - params.get(FlinkParamConstant.USERNAME), - params.get(FlinkParamConstant.PASSWORD)); - } - - public String getDriver() { - return driver; - } - - public String getUrl() { - return url; - } - - public String getUsername() { - return username; - } - - public String getPassword() { - return password; - } - - @Override - public String toString() { - return "DBConfig{" - + "driver='" - + driver - + '\'' - + ", url='" - + url - + '\'' - + ", username='" - + username - + '\'' - + ", password='" - + password - + '\'' - + '}'; - } -} diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBUtil.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBUtil.java index 8cf228197c..a2746d3b81 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBUtil.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/db/DBUtil.java @@ -19,16 +19,17 @@ package org.dinky.app.db; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; +import org.dinky.data.app.AppDatabase; +import org.dinky.data.app.AppParamConfig; +import org.dinky.data.app.AppTask; + import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; + +import cn.hutool.core.text.StrFormatter; +import cn.hutool.db.Db; +import cn.hutool.db.Entity; +import cn.hutool.db.ds.simple.SimpleDataSource; /** * DBUtil @@ -37,100 +38,33 @@ */ public class DBUtil { - private static Connection getConnection(DBConfig config) throws IOException { - Connection conn = null; - try { - Class.forName(config.getDriver()); - conn = DriverManager.getConnection(config.getUrl(), config.getUsername(), config.getPassword()); - } catch (SQLException | ClassNotFoundException e) { - e.printStackTrace(); - close(conn); - } - return conn; - } - - private static void close(Connection conn) { - try { - if (conn != null) { - conn.close(); - } - } catch (SQLException e) { - e.printStackTrace(); - } - } + private static Db db; - public static String getOneByID(String sql, DBConfig config) throws SQLException, IOException { - Connection conn = getConnection(config); - String result = null; - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - if (rs.next()) { - result = rs.getString(1); - } - } - close(conn); - /* - * catch (SQLException e1) { e1.printStackTrace(); String message = e1.getMessage(); - * System.err.println(LocalDateTime.now().toString() + " --> 获取 FlinkSQL 异常,ID 为"); } - */ - return result; + public static void init(AppParamConfig config) { + db = Db.use(new SimpleDataSource(config.getUrl(), config.getUsername(), config.getPassword())); } - public static Map getMapByID(String sql, DBConfig config) throws SQLException, IOException { - Connection conn = getConnection(config); - HashMap map = new HashMap(); - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - List columnList = new ArrayList<>(); - for (int i = 0; i < rs.getMetaData().getColumnCount(); i++) { - columnList.add(rs.getMetaData().getColumnLabel(i + 1)); - } - if (rs.next()) { - for (int i = 0; i < columnList.size(); i++) { - map.put(columnList.get(i), rs.getString(i + 1)); - } - } - } - close(conn); - return map; - } - /** - * 获取数据源的连接信息,作为全局变量 - * - * @param sql 查询SQL,必须只有两列的结果,第一个为数据库名称,第二个为配置信息 - * @param config 核心数据库配置 - */ - public static String getDbSourceSQLStatement(String sql, DBConfig config) throws SQLException, IOException { - Connection conn = getConnection(config); - String sqlStatements = ""; - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - while (rs.next()) { - sqlStatements = sqlStatements + rs.getString(1) + ":=" + rs.getString(2) + "\n;\n"; - } + public static AppTask getTask(int taskId) throws SQLException { + Entity option = Entity.create("dinky_task").set("id", taskId).set("enabled", true); + List entities = db.find(option, AppTask.class); + if (entities.size() <= 0) { + throw new IllegalArgumentException( + StrFormatter.format("The Task is not found: {}, please check! ", taskId)); + } else { + return entities.get(0); } - close(conn); - return sqlStatements; } - public static List> getListByID(String sql, DBConfig config) throws SQLException, IOException { - Connection conn = getConnection(config); - List> list = new ArrayList<>(); - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - List columnList = new ArrayList<>(); - for (int i = 0; i < rs.getMetaData().getColumnCount(); i++) { - columnList.add(rs.getMetaData().getColumnName(i)); - } - while (rs.next()) { - HashMap map = new HashMap(); - for (int i = 0; i < columnList.size(); i++) { - map.put(columnList.get(i), rs.getString(i)); - } - list.add(map); - } + public static String getDbSourceSQLStatement() throws SQLException { + StringBuilder sb = new StringBuilder(); + Entity option = Entity.create("dinky_database").set("enabled", true); + List entities = db.find(option, AppDatabase.class); + for (AppDatabase entity : entities) { + sb.append(entity.getName()) + .append(":=") + .append(entity.getFlinkConfig()) + .append("\n;\n"); } - close(conn); - return list; + return sb.toString(); } } diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index d6b2b2e130..4d87052500 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -19,10 +19,12 @@ package org.dinky.app.flinksql; -import org.dinky.app.db.DBConfig; import org.dinky.app.db.DBUtil; +import org.dinky.app.model.StatementParam; import org.dinky.assertion.Asserts; import org.dinky.constant.FlinkSQLConstant; +import org.dinky.data.app.AppParamConfig; +import org.dinky.data.app.AppTask; import org.dinky.executor.Executor; import org.dinky.executor.ExecutorConfig; import org.dinky.executor.ExecutorFactory; @@ -50,9 +52,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -68,182 +68,53 @@ * @since 2021/10/27 */ public class Submitter { - - private static final Logger logger = LoggerFactory.getLogger(Submitter.class); - private static final String NULL = "null"; - - private static String getQuerySQL(Integer id) throws SQLException { - if (id == null) { - throw new SQLException("请指定任务ID"); - } - return "select statement from dinky_task_statement where id = " + id; - } - - private static String getTaskInfo(Integer id) throws SQLException { - if (id == null) { - throw new SQLException("请指定任务ID"); - } - return "select id, name as jobName, type,check_point as checkPoint,save_point_path as" - + " savePointPath, parallelism,fragment as useSqlFragment,statement_set as" - + " useStatementSet,config_json as config, env_id as envId,batch_model AS" - + " useBatchModel from dinky_task where id = " - + id; - } - - private static String getFlinkSQLStatement(Integer id, DBConfig config) { - String statement = ""; - try { - statement = DBUtil.getOneByID(getQuerySQL(id), config); - } catch (IOException | SQLException e) { - logger.error( - "{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", - LocalDateTime.now(), - id, - config, - e.getMessage(), - e); - } - return statement; - } - - public static Map getTaskConfig(Integer id, DBConfig config) { - Map task = new HashMap<>(); - try { - task = DBUtil.getMapByID(getTaskInfo(id), config); - } catch (IOException | SQLException e) { - logger.error( - "{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ", - LocalDateTime.now(), - id, - config, - e.getMessage(), - e); - } - return task; - } - - public static List getStatements(String sql) { - return Arrays.asList(SqlUtil.getStatements(sql)); - } - - public static String getDbSourceSqlStatements(DBConfig dbConfig, Integer id) { - String sql = "select name,flink_config from dinky_database where enabled = 1"; - String sqlCheck = "select fragment from dinky_task where id = " + id; - try { - // 首先判断是否开启了全局变量 - String fragment = DBUtil.getOneByID(sqlCheck, dbConfig); - if ("1".equals(fragment)) { - return DBUtil.getDbSourceSQLStatement(sql, dbConfig); - } - - // 全局变量未开启,返回空字符串 - logger.info("任务 {} 未开启全局变量,不进行变量加载。", id); - } catch (IOException | SQLException e) { - logger.error( - "{} --> 获取 数据源信息异常,请检查数据库连接,连接信息为:{} ,异常信息为:{}", LocalDateTime.now(), dbConfig, e.getMessage(), e); - } - - return ""; + private static final Logger log = LoggerFactory.getLogger(Submitter.class); + + public static void submit(AppParamConfig config) throws SQLException { + log.info("{} Start Submit Job:{}", LocalDateTime.now(), config.getTaskId()); + + AppTask appTask = DBUtil.getTask(config.getTaskId()); + String sql = buildSql(appTask); + + ExecutorConfig executorConfig = ExecutorConfig.builder() + .type(appTask.getType()) + .checkpoint(appTask.getCheckPoint()) + .parallelism(appTask.getParallelism()) + .useStatementSet(appTask.getStatementSet()) + .useBatchModel(appTask.getBatchModel()) + .savePointPath(appTask.getSavePointPath()) + .jobName(appTask.getName()) + // 此处不应该再设置config,否则破坏了正常配置优先级顺序 + // .config(JsonUtils.toMap(appTask.getConfigJson())) + .build(); + + // 加载第三方jar //TODO 这里有问题,需要修一修 + // String dinkyAddr = Optional.ofNullable(config.getDinkyAddr()).orElse(""); + // loadDep(appTask.getType(), config.getTaskId(), dinkyAddr, executorConfig); + + log.info("The job configuration is as follows: {}", executorConfig); + + String[] statements = SqlUtil.getStatements(sql, config.getSplit()); + excuteJob(executorConfig, statements); } - public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { - logger.info("{}开始提交作业 -- {}", LocalDateTime.now(), id); - if (NULL.equals(dinkyAddr)) { - dinkyAddr = ""; - } + public static String buildSql(AppTask appTask) throws SQLException { StringBuilder sb = new StringBuilder(); - Map taskConfig = Submitter.getTaskConfig(id, dbConfig); - - if (Asserts.isNotNull(taskConfig.get("envId"))) { - String envId = getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig); - if (Asserts.isNotNullString(envId)) { - sb.append(envId); - } - sb.append("\n"); - } - - // 添加数据源全局变量 - sb.append(getDbSourceSqlStatements(dbConfig, id)); - // 添加自定义全局变量信息 - sb.append(getFlinkSQLStatement(id, dbConfig)); - - ExecutorConfig executorConfig = ExecutorConfig.buildFromMap(taskConfig); - // 加载第三方jar - loadDep(taskConfig.get("type"), id, dinkyAddr, executorConfig); - - logger.info("The job configuration is as follows: {}", executorConfig); - Executor executor = ExecutorFactory.buildAppStreamExecutor(executorConfig); - List ddl = new ArrayList<>(); - List trans = new ArrayList<>(); - List execute = new ArrayList<>(); - - List statements = Submitter.getStatements(sb.toString()); - for (String item : statements) { - String statement = FlinkInterceptor.pretreatStatement(executor, item); - if (statement.isEmpty()) { - continue; - } - - SqlType operationType = Operations.getOperationType(statement); - if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { - trans.add(new StatementParam(statement, operationType)); - if (!executorConfig.isUseStatementSet()) { - break; - } - } else if (operationType.equals(SqlType.EXECUTE)) { - execute.add(new StatementParam(statement, operationType)); - if (!executorConfig.isUseStatementSet()) { - break; - } - } else { - ddl.add(new StatementParam(statement, operationType)); - } - } - - for (StatementParam item : ddl) { - logger.info("Executing FlinkSQL: {}", item.getValue()); - executor.executeSql(item.getValue()); - logger.info("Execution succeeded."); - } - - if (!trans.isEmpty()) { - if (executorConfig.isUseStatementSet()) { - List inserts = new ArrayList<>(); - for (StatementParam item : trans) { - if (item.getType().equals(SqlType.INSERT)) { - inserts.add(item.getValue()); - } - } - logger.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts)); - executor.executeStatementSet(inserts); - logger.info("Execution succeeded."); - } else { - StatementParam item = trans.get(0); - logger.info("Executing FlinkSQL: {}", item.getValue()); - executor.executeSql(item.getValue()); - logger.info("Execution succeeded."); + // build env task + if (Asserts.isNotNull(appTask.getEnvId())) { + AppTask envTask = DBUtil.getTask(appTask.getEnvId()); + if (Asserts.isNotNullString(envTask.getStatement())) { + log.info("use statement is enable, load env:{}", envTask.getName()); + sb.append(envTask.getStatement()).append("\n"); } } - - if (!execute.isEmpty()) { - List executes = new ArrayList<>(); - for (StatementParam item : execute) { - executes.add(item.getValue()); - executor.executeSql(item.getValue()); - if (!executorConfig.isUseStatementSet()) { - break; - } - } - - logger.info("正在执行 FlinkSQL 语句集: {}", String.join(FlinkSQLConstant.SEPARATOR, executes)); - try { - executor.execute(executorConfig.getJobName()); - logger.info("执行成功"); - } catch (Exception e) { - logger.error("执行失败, {}", e.getMessage(), e); - } + // build Database golbal varibals + if (appTask.getFragment()) { + log.info("Global env is enable, load database flink config env."); + sb.append(DBUtil.getDbSourceSQLStatement()).append("\n"); } - logger.info("{}任务提交成功", LocalDateTime.now()); + sb.append(appTask.getStatement()); + return sb.toString(); } private static void loadDep(String type, Integer taskId, String dinkyAddr, ExecutorConfig executorConfig) { @@ -254,7 +125,7 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu if ("kubernetes-application".equals(type)) { try { String httpJar = "http://" + dinkyAddr + "/download/downloadDepJar/" + taskId; - logger.info("下载依赖 http-url为:{}", httpJar); + log.info("下载依赖 http-url为:{}", httpJar); String flinkHome = System.getenv("FLINK_HOME"); String usrlib = flinkHome + "/usrlib"; FileUtils.forceMkdir(new File(usrlib)); @@ -293,7 +164,7 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu } } } catch (IOException e) { - logger.error(""); + log.error(""); throw new RuntimeException(e); } } @@ -335,4 +206,79 @@ public static boolean downloadFile(String url, String path) throws IOException { return false; } } + + public static void excuteJob(ExecutorConfig executorConfig, String[] statements) { + + Executor executor = ExecutorFactory.buildAppStreamExecutor(executorConfig); + List ddl = new ArrayList<>(); + List trans = new ArrayList<>(); + List execute = new ArrayList<>(); + + for (String item : statements) { + String statement = FlinkInterceptor.pretreatStatement(executor, item); + if (statement.isEmpty()) { + continue; + } + + SqlType operationType = Operations.getOperationType(statement); + if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { + trans.add(new StatementParam(statement, operationType)); + if (!executorConfig.isUseStatementSet()) { + break; + } + } else if (operationType.equals(SqlType.EXECUTE)) { + execute.add(new StatementParam(statement, operationType)); + if (!executorConfig.isUseStatementSet()) { + break; + } + } else { + ddl.add(new StatementParam(statement, operationType)); + } + } + + for (StatementParam item : ddl) { + log.info("Executing FlinkSQL: {}", item.getValue()); + executor.executeSql(item.getValue()); + log.info("Execution succeeded."); + } + + if (!trans.isEmpty()) { + if (executorConfig.isUseStatementSet()) { + List inserts = new ArrayList<>(); + for (StatementParam item : trans) { + if (item.getType().equals(SqlType.INSERT)) { + inserts.add(item.getValue()); + } + } + log.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts)); + executor.executeStatementSet(inserts); + log.info("Execution succeeded."); + } else { + StatementParam item = trans.get(0); + log.info("Executing FlinkSQL: {}", item.getValue()); + executor.executeSql(item.getValue()); + log.info("Execution succeeded."); + } + } + + if (!execute.isEmpty()) { + List executes = new ArrayList<>(); + for (StatementParam item : execute) { + executes.add(item.getValue()); + executor.executeSql(item.getValue()); + if (!executorConfig.isUseStatementSet()) { + break; + } + } + + log.info("正在执行 FlinkSQL 语句集: {}", String.join(FlinkSQLConstant.SEPARATOR, executes)); + try { + executor.execute(executorConfig.getJobName()); + log.info("执行成功"); + } catch (Exception e) { + log.error("执行失败, {}", e.getMessage(), e); + } + } + log.info("{}任务提交成功", LocalDateTime.now()); + } } diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/StatementParam.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/model/StatementParam.java similarity index 70% rename from dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/StatementParam.java rename to dinky-app/dinky-app-base/src/main/java/org/dinky/app/model/StatementParam.java index ce85648b02..d42c84da69 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/StatementParam.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/model/StatementParam.java @@ -17,38 +17,23 @@ * */ -package org.dinky.app.flinksql; +package org.dinky.app.model; import org.dinky.parser.SqlType; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + /** * StatementParam * * @since 2021/11/16 */ +@Data +@AllArgsConstructor +@NoArgsConstructor public class StatementParam { - private String value; private SqlType type; - - public StatementParam(String value, SqlType type) { - this.value = value; - this.type = type; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } - - public SqlType getType() { - return type; - } - - public void setType(SqlType type) { - this.type = type; - } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java index 082c2f36b9..3afa0c1915 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java @@ -25,13 +25,5 @@ * @since 2022/3/9 19:18 */ public final class FlinkParamConstant { - - public static final String ID = "id"; - public static final String DRIVER = "driver"; - public static final String URL = "url"; - public static final String USERNAME = "username"; - public static final String PASSWORD = "password"; - public static final String DINKY_ADDR = "dinkyAddr"; - public static final String SPLIT = ","; } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java deleted file mode 100644 index f816ce219a..0000000000 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkBaseUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.utils; - -import org.dinky.constant.FlinkParamConstant; -import org.dinky.data.model.FlinkCDCConfig; -import org.dinky.data.model.Table; - -import org.apache.flink.api.java.utils.ParameterTool; - -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * FlinkBaseUtil - * - * @since 2022/3/9 19:15 - */ -public class FlinkBaseUtil { - private static final Logger logger = LoggerFactory.getLogger(FlinkBaseUtil.class); - - public static Map getParamsFromArgs(String[] args) { - Map params = new HashMap<>(); - ParameterTool parameters = ParameterTool.fromArgs(args); - params.put(FlinkParamConstant.ID, parameters.get(FlinkParamConstant.ID, null)); - params.put(FlinkParamConstant.DRIVER, parameters.get(FlinkParamConstant.DRIVER, null)); - params.put(FlinkParamConstant.URL, parameters.get(FlinkParamConstant.URL, null)); - params.put(FlinkParamConstant.USERNAME, parameters.get(FlinkParamConstant.USERNAME, null)); - params.put(FlinkParamConstant.PASSWORD, parameters.get(FlinkParamConstant.PASSWORD, null)); - params.put(FlinkParamConstant.DINKY_ADDR, parameters.get(FlinkParamConstant.DINKY_ADDR, null)); - return params; - } - - public static String getSinkConfigurationString( - Table table, FlinkCDCConfig config, String sinkSchemaName, String sinkTableName, String pkList) { - String configurationString = - SqlUtil.replaceAllParam(config.getSinkConfigurationString(), "schemaName", sinkSchemaName); - configurationString = SqlUtil.replaceAllParam(configurationString, "tableName", sinkTableName); - if (configurationString.contains("${pkList}")) { - configurationString = SqlUtil.replaceAllParam(configurationString, "pkList", pkList); - } - return configurationString; - } - - public static String convertSinkColumnType(String type, FlinkCDCConfig config) { - if (config.getSink().get("connector").equals("hudi")) { - if (type.equals("TIMESTAMP")) { - return "TIMESTAMP(3)"; - } - } - return type; - } -} diff --git a/dinky-common/src/main/java/org/dinky/data/app/AppDatabase.java b/dinky-common/src/main/java/org/dinky/data/app/AppDatabase.java new file mode 100644 index 0000000000..90f634b9f6 --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/app/AppDatabase.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.app; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class AppDatabase { + @ApiModelProperty(value = "ID", required = true, dataType = "Integer", example = "1", notes = "Primary Key") + private Integer id; + + @ApiModelProperty(value = "Name", required = true, dataType = "String", example = "Name") + private String name; + + @ApiModelProperty(value = "flinkConfig", dataType = "String", example = "flinkConfig") + private String flinkConfig; +} diff --git a/dinky-common/src/main/java/org/dinky/data/app/AppParamConfig.java b/dinky-common/src/main/java/org/dinky/data/app/AppParamConfig.java new file mode 100644 index 0000000000..175c048540 --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/app/AppParamConfig.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.app; + +import lombok.Builder; +import lombok.Data; + +/** + * Dinky configuration information passed by the submit Application task + * */ +@Data +@Builder +public class AppParamConfig { + private int taskId; + private String url; + private String username; + private String password; + private String dinkyAddr; + private String split; + + @Override + public String toString() { + return "\nAppParamConfig{" + "\ntaskId=" + + taskId + "\n, url='" + + url + '\'' + "\n, username='" + + username + '\'' + "\n, password='***********'" + + "\n, dinkyAddr='" + + dinkyAddr + '\'' + "\n, split='" + + split + '\'' + "'\n}'"; + } +} diff --git a/dinky-common/src/main/java/org/dinky/data/app/AppTask.java b/dinky-common/src/main/java/org/dinky/data/app/AppTask.java new file mode 100644 index 0000000000..8cfaef7220 --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/app/AppTask.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.app; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class AppTask { + @ApiModelProperty(value = "ID", required = true, dataType = "Integer", example = "1", notes = "Primary Key") + private Integer id; + + @ApiModelProperty(value = "Name", required = true, dataType = "String", example = "Name") + private String name; + + @ApiModelProperty(value = "Type", dataType = "String", notes = "Type of the task") + private String type; + + @ApiModelProperty(value = "Check Point", dataType = "Integer", example = "1", notes = "Check point for the task") + private Integer checkPoint; + + @ApiModelProperty(value = "Save Point Path", dataType = "String", notes = "Save point path for the task") + private String savePointPath; + + @ApiModelProperty(value = "Parallelism", dataType = "Integer", example = "4", notes = "Parallelism for the task") + private Integer parallelism; + + @ApiModelProperty(value = "Fragment", dataType = "Boolean", example = "true", notes = "task Fragment") + private Boolean fragment; + + @ApiModelProperty(value = "Statement Set", dataType = "Boolean", example = "false", notes = "use Statement") + private Boolean statementSet; + + @ApiModelProperty(value = "Environment ID", dataType = "Integer", example = "6001", notes = "environment id") + private Integer envId; + + @ApiModelProperty(value = "Configuration JSON", dataType = "TaskExtConfig", notes = "task config") + private String configJson; + + @ApiModelProperty(value = "Batch Model", dataType = "Boolean", example = "true", notes = "Batch task") + private Boolean batchModel; + + @ApiModelProperty(value = "Statement", dataType = "String", notes = "SQL statement for the task") + private String statement; +} diff --git a/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java b/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java index 9d9b99ac5e..566c468f5b 100644 --- a/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java +++ b/dinky-executor/src/main/java/org/dinky/executor/ExecutorConfig.java @@ -22,8 +22,6 @@ import org.dinky.assertion.Asserts; import org.dinky.gateway.enums.GatewayType; -import org.apache.commons.lang3.math.NumberUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -63,16 +61,6 @@ public class ExecutorConfig { .useSqlFragment(true) .build(); - public static final String TYPE_CONST = "type"; - public static final String CHECKPOINT_CONST = "checkpoint"; - public static final String PARALLELISM_CONST = "parallelism"; - public static final String USE_SQL_FRAGMENT = "useSqlFragment"; - public static final String USE_STATEMENT_SET = "useStatementSet"; - public static final String USE_BATCH_MODEL = "useBatchModel"; - public static final String SAVE_POINT_PATH = "savePointPath"; - public static final String JOB_NAME = "jobName"; - public static final String CONFIG_CONST = "config"; - // after unique all run model to remote, this field could discard @ApiModelProperty( value = "Flink run mode", @@ -239,22 +227,6 @@ public static ExecutorConfig build( null); } - public static ExecutorConfig buildFromMap(Map settingMap) { - Integer checkpoint = NumberUtils.createInteger(settingMap.get(CHECKPOINT_CONST)); - Integer parallelism = NumberUtils.createInteger(settingMap.get(PARALLELISM_CONST)); - String type = settingMap.get(TYPE_CONST); - return build( - type, - checkpoint, - parallelism, - "1".equals(settingMap.get(USE_SQL_FRAGMENT)), - "1".equals(settingMap.get(USE_STATEMENT_SET)), - "1".equals(settingMap.get(USE_BATCH_MODEL)), - settingMap.get(SAVE_POINT_PATH), - settingMap.get(JOB_NAME), - settingMap.get(CONFIG_CONST)); - } - public String getJobManagerAddress() { return host + ":" + port; } diff --git a/dinky-executor/src/test/java/org/dinky/executor/ExecutorConfigTest.java b/dinky-executor/src/test/java/org/dinky/executor/ExecutorConfigTest.java index c709273ab0..a4963a0b7f 100644 --- a/dinky-executor/src/test/java/org/dinky/executor/ExecutorConfigTest.java +++ b/dinky-executor/src/test/java/org/dinky/executor/ExecutorConfigTest.java @@ -19,31 +19,11 @@ package org.dinky.executor; -import static org.dinky.executor.ExecutorConfig.CHECKPOINT_CONST; -import static org.dinky.executor.ExecutorConfig.PARALLELISM_CONST; -import static org.junit.jupiter.api.Assertions.*; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - import org.junit.jupiter.api.Test; /** */ class ExecutorConfigTest { @Test - void build() { - Map maps = new HashMap<>(); - maps.put(CHECKPOINT_CONST, "123"); - maps.put(PARALLELISM_CONST, "456"); - - ExecutorConfig es = ExecutorConfig.buildFromMap(maps); - assertEquals(123, es.getCheckpoint()); - assertEquals(456, es.getParallelism()); - - ExecutorConfig esNull = ExecutorConfig.buildFromMap(Collections.emptyMap()); - assertNull(esNull.getCheckpoint()); - assertNull(esNull.getParallelism()); - } + void build() {} } diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java index 641c86e751..f2d29e894c 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesGateway.java @@ -74,7 +74,6 @@ private void initConfig() { } catch (Exception e) { logger.warn("load locale config yaml failed:{},Skip config it", e.getMessage()); } - addConfigParas(KubernetesConfigOptions.FLINK_CONF_DIR, flinkConfigPath); addConfigParas(flinkConfig.getConfiguration()); addConfigParas(k8sConfig.getConfiguration());