diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java index 8f33c7b328..e87e03ed73 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java @@ -45,7 +45,6 @@ import org.dinky.service.DataBaseService; import org.dinky.service.StudioService; import org.dinky.service.TaskService; -import org.dinky.sql.FlinkQuery; import org.dinky.utils.FlinkTableMetadataUtil; import org.dinky.utils.RunTimeUtil; @@ -74,6 +73,7 @@ public class StudioServiceImpl implements StudioService { private final DataBaseService dataBaseService; private final TaskService taskService; private final Cache jobManagerCache = CacheUtil.newTimedCache(1000 * 60 * 2); + private final String DEFAULT_CATALOG = "default_catalog"; private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) { String envSql = taskService.buildEnvSql(studioMetaStoreDTO); @@ -142,7 +142,7 @@ public List getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) { if (Dialect.isCommonSql(studioMetaStoreDTO.getDialect())) { DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId()); if (!Asserts.isNull(dataBase)) { - Catalog defaultCatalog = Catalog.build(FlinkQuery.defaultCatalog()); + Catalog defaultCatalog = Catalog.build(DEFAULT_CATALOG); Driver driver = Driver.build(dataBase.getDriverConfig()); defaultCatalog.setSchemas(driver.listSchemas()); catalogs.add(defaultCatalog); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index fcd3c796dc..e8d4593652 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -22,6 +22,7 @@ import org.dinky.assertion.Asserts; import org.dinky.assertion.DinkyAssert; import org.dinky.config.Dialect; +import org.dinky.constant.FlinkSQLConstant; import org.dinky.context.TenantContextHolder; import org.dinky.data.annotations.ProcessStep; import org.dinky.data.app.AppParamConfig; @@ -644,8 +645,6 @@ public List listFlinkSQLEnv() { @Transactional(rollbackFor = Exception.class) public Task initDefaultFlinkSQLEnv(Integer tenantId) { TenantContextHolder.set(tenantId); - String separator = SystemConfiguration.getInstances().getSqlSeparator(); - separator = separator.replace("\\r", "\r").replace("\\n", "\n"); String name = "DefaultCatalog"; Task defaultFlinkSQLEnvTask = getTaskByNameAndTenantId(name, tenantId); @@ -658,7 +657,11 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) { + "'password' = '%s',\n" + " 'url' = '%s'\n" + ")%suse catalog my_catalog%s", - dsProperties.getUsername(), dsProperties.getPassword(), dsProperties.getUrl(), separator, separator); + dsProperties.getUsername(), + dsProperties.getPassword(), + dsProperties.getUrl(), + FlinkSQLConstant.SEPARATOR, + FlinkSQLConstant.SEPARATOR); if (null != defaultFlinkSQLEnvTask) { defaultFlinkSQLEnvTask.setStatement(sql); diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 8d7e5187e0..13f533730b 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -127,8 +127,7 @@ public static void submit(AppParamConfig config) throws SQLException { loadDep(appTask.getType(), config.getTaskId(), executorConfig); log.info("The job configuration is as follows: {}", executorConfig); - String[] statements = - SqlUtil.getStatements(sql, SystemConfiguration.getInstances().getSqlSeparator()); + String[] statements = SqlUtil.getStatements(sql); Optional jobClient = Optional.empty(); try { if (Dialect.FLINK_JAR == appTask.getDialect()) { diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/sql/FlinkQuery.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/sql/FlinkQuery.java deleted file mode 100644 index ce7d6e7421..0000000000 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/sql/FlinkQuery.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.sql; - -import org.dinky.data.model.SystemConfiguration; - -/** - * FlinkQuery - * - * @since 2022/7/18 18:43 - */ -public class FlinkQuery { - - public static String separator() { - return SystemConfiguration.getInstances().getSqlSeparator(); - } - - public static String defaultCatalog() { - return "default_catalog"; - } - - public static String defaultDatabase() { - return "default_database"; - } - - public static String showCatalogs() { - return "SHOW CATALOGS"; - } - - public static String useCatalog(String catalog) { - return String.format("USE CATALOG %s", catalog); - } - - public static String showDatabases() { - return "SHOW DATABASES"; - } - - public static String useDatabase(String database) { - return String.format("USE %s", database); - } - - public static String showTables() { - return "SHOW TABLES"; - } - - public static String showViews() { - return "SHOW VIEWS"; - } - - public static String showFunctions() { - return "SHOW FUNCTIONS"; - } - - public static String showUserFunctions() { - return "SHOW USER FUNCTIONS"; - } - - public static String showModules() { - return "SHOW MODULES"; - } - - public static String descTable(String table) { - return String.format("DESC %s", table); - } - - public static String columnName() { - return "name"; - } - - public static String columnType() { - return "type"; - } - - public static String columnNull() { - return "null"; - } - - public static String columnKey() { - return "key"; - } - - public static String columnExtras() { - return "extras"; - } - - public static String columnWatermark() { - return "watermark"; - } -} diff --git a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java index 83eaa1e730..3808a9b6a3 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java +++ b/dinky-common/src/main/java/org/dinky/data/model/SystemConfiguration.java @@ -337,10 +337,6 @@ public boolean isUseRestAPI() { return useRestAPI.getValue(); } - public String getSqlSeparator() { - return sqlSeparator.getValue(); - } - public int getJobIdWait() { return jobIdWait.getValue(); } diff --git a/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java b/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java index a83aa92439..398993bdef 100644 --- a/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java +++ b/dinky-common/src/main/java/org/dinky/utils/SqlUtil.java @@ -20,7 +20,6 @@ package org.dinky.utils; import org.dinky.assertion.Asserts; -import org.dinky.data.model.SystemConfiguration; import java.util.Map; import java.util.regex.Pattern; @@ -33,11 +32,12 @@ public class SqlUtil { private static final String SEMICOLON = ";"; + private static final String SQL_SEPARATOR = ";\\s*(?:\\n|--.*)"; private SqlUtil() {} public static String[] getStatements(String sql) { - return getStatements(sql, SystemConfiguration.getInstances().getSqlSeparator()); + return getStatements(sql, SQL_SEPARATOR); } public static String[] getStatements(String sql, String sqlSeparator) { diff --git a/dinky-core/src/main/java/org/dinky/constant/FlinkSQLConstant.java b/dinky-core/src/main/java/org/dinky/constant/FlinkSQLConstant.java index dc79bd2654..05fb9e7412 100644 --- a/dinky-core/src/main/java/org/dinky/constant/FlinkSQLConstant.java +++ b/dinky-core/src/main/java/org/dinky/constant/FlinkSQLConstant.java @@ -19,8 +19,6 @@ package org.dinky.constant; -import org.dinky.sql.FlinkQuery; - /** * FlinkSQLConstant * @@ -30,13 +28,7 @@ public class FlinkSQLConstant { private FlinkSQLConstant() {} /** 分隔符 */ - public static final String SEPARATOR = FlinkQuery.separator(); - /** DDL 类型 */ - public static final String DDL = "DDL"; - /** DML 类型 */ - public static final String DML = "DML"; - /** DATASTREAM 类型 */ - public static final String DATASTREAM = "DATASTREAM"; + public static final String SEPARATOR = ";\\n"; /** The define identifier of FlinkSQL Variable */ public static final String VARIABLES = ":="; diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 51195430d4..25064b357f 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -20,9 +20,7 @@ package org.dinky.explainer; import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkSQLConstant; import org.dinky.data.model.LineageRel; -import org.dinky.data.model.SystemConfiguration; import org.dinky.data.result.ExplainResult; import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.CustomTableEnvironment; @@ -77,34 +75,22 @@ public class Explainer { private Executor executor; private boolean useStatementSet; - private String sqlSeparator; private ObjectMapper mapper = new ObjectMapper(); private JobManager jobManager; public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) { - this(executor, useStatementSet, FlinkSQLConstant.SEPARATOR, jobManager); - init(); - } - - public Explainer(Executor executor, boolean useStatementSet, String sqlSeparator, JobManager jobManager) { this.executor = executor; this.useStatementSet = useStatementSet; - this.sqlSeparator = sqlSeparator; this.jobManager = jobManager; } - public void init() { - sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator(); - } - - public static Explainer build( - Executor executor, boolean useStatementSet, String sqlSeparator, JobManager jobManager) { - return new Explainer(executor, useStatementSet, sqlSeparator, jobManager); + public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) { + return new Explainer(executor, useStatementSet, jobManager); } public Explainer initialize(JobConfig config, String statement) { DinkyClassLoaderUtil.initClassLoader(config, jobManager.getDinkyClassLoader()); - String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement), sqlSeparator); + String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement)); List udfs = parseUDFFromStatements(statements); jobManager.setJobParam(new JobParam(udfs)); try { @@ -199,7 +185,7 @@ public List parseUDFFromStatements(String[] statements) { public ExplainResult explainSql(String statement) { log.info("Start explain FlinkSQL..."); - JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); + JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); List sqlExplainRecords = new ArrayList<>(); int index = 1; boolean correct = true; @@ -335,7 +321,7 @@ public ExplainResult explainSql(String statement) { } public ObjectNode getStreamGraph(String statement) { - JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); + JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); jobParam.getDdl().forEach(statementParam -> executor.executeSql(statementParam.getValue())); if (!jobParam.getTrans().isEmpty()) { @@ -351,7 +337,7 @@ public ObjectNode getStreamGraph(String statement) { } public JobPlanInfo getJobPlanInfo(String statement) { - JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); + JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); jobParam.getDdl().forEach(statementParam -> executor.executeSql(statementParam.getValue())); if (!jobParam.getTrans().isEmpty()) { @@ -380,7 +366,7 @@ public List getLineage(String statement) { this.initialize(jobConfig, statement); List lineageRelList = new ArrayList<>(); - for (String item : SqlUtil.getStatements(statement, sqlSeparator)) { + for (String item : SqlUtil.getStatements(statement)) { try { String sql = FlinkInterceptor.pretreatStatement(executor, item); if (Asserts.isNullString(sql)) { diff --git a/dinky-core/src/main/java/org/dinky/job/JobBuilder.java b/dinky-core/src/main/java/org/dinky/job/JobBuilder.java index cb09ce09a5..ce529a2606 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/JobBuilder.java @@ -31,7 +31,6 @@ public abstract class JobBuilder { protected Executor executor; protected boolean useStatementSet; protected boolean useGateway; - protected String sqlSeparator; protected Job job; public JobBuilder(JobManager jobManager) { @@ -42,7 +41,6 @@ public JobBuilder(JobManager jobManager) { this.executor = jobManager.getExecutor(); this.useStatementSet = jobManager.isUseStatementSet(); this.useGateway = jobManager.isUseGateway(); - this.sqlSeparator = jobManager.getSqlSeparator(); this.job = jobManager.getJob(); } diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 2fe1f758a1..7bffba5162 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -22,7 +22,6 @@ import org.dinky.api.FlinkAPI; import org.dinky.assertion.Asserts; import org.dinky.classloader.DinkyClassLoader; -import org.dinky.constant.FlinkSQLConstant; import org.dinky.context.CustomTableEnvironmentContext; import org.dinky.context.FlinkUdfPathContextHolder; import org.dinky.context.RowLevelPermissionsContext; @@ -104,7 +103,6 @@ public class JobManager { private boolean isPlanMode = false; private boolean useStatementSet = false; private boolean useRestAPI = false; - private String sqlSeparator = FlinkSQLConstant.SEPARATOR; private GatewayType runMode = GatewayType.LOCAL; private JobParam jobParam = null; @@ -162,10 +160,6 @@ public boolean isUseRestAPI() { return useRestAPI; } - public String getSqlSeparator() { - return sqlSeparator; - } - public boolean isUseGateway() { return useGateway; } @@ -216,7 +210,6 @@ public void init() { } useStatementSet = config.isStatementSet(); useRestAPI = SystemConfiguration.getInstances().isUseRestAPI(); - sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator(); executorConfig = config.getExecutorSetting(); executorConfig.setPlan(isPlanMode); executor = ExecutorFactory.buildExecutor(executorConfig, getDinkyClassLoader()); @@ -333,8 +326,8 @@ public JobResult executeSql(String statement) throws Exception { ready(); DinkyClassLoaderUtil.initClassLoader(config, getDinkyClassLoader()); - jobParam = Explainer.build(executor, useStatementSet, sqlSeparator, this) - .pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator)); + jobParam = + Explainer.build(executor, useStatementSet, this).pretreatStatements(SqlUtil.getStatements(statement)); try { // step 1: init udf JobUDFBuilder.build(this).run(); @@ -367,7 +360,7 @@ public JobResult executeSql(String statement) throws Exception { } public IResult executeDDL(String statement) { - String[] statements = SqlUtil.getStatements(statement, sqlSeparator); + String[] statements = SqlUtil.getStatements(statement); try { IResult result = null; for (String item : statements) { @@ -401,19 +394,19 @@ public static SelectResult getJobData(String jobId) { } public ExplainResult explainSql(String statement) { - return Explainer.build(executor, useStatementSet, sqlSeparator, this) + return Explainer.build(executor, useStatementSet, this) .initialize(config, statement) .explainSql(statement); } public ObjectNode getStreamGraph(String statement) { - return Explainer.build(executor, useStatementSet, sqlSeparator, this) + return Explainer.build(executor, useStatementSet, this) .initialize(config, statement) .getStreamGraph(statement); } public String getJobPlanJson(String statement) { - return Explainer.build(executor, useStatementSet, sqlSeparator, this) + return Explainer.build(executor, useStatementSet, this) .initialize(config, statement) .getJobPlanInfo(statement) .getJsonPlan(); diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java index 14bcf7e206..59d613f2f7 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java @@ -60,7 +60,7 @@ public void run() throws Exception {} public StreamGraph getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) { DinkyClassLoaderUtil.initClassLoader(config, dinkyClassLoader); - String[] statements = SqlUtil.getStatements(statement, sqlSeparator); + String[] statements = SqlUtil.getStatements(statement); ExecuteJarOperation executeJarOperation = null; for (String sql : statements) { String sqlStatement = executor.pretreatStatement(sql); @@ -83,7 +83,7 @@ public StreamGraph getJarStreamGraph(String statement, DinkyClassLoader dinkyCla } public List getUris(String statement) { - String[] statements = SqlUtil.getStatements(statement, sqlSeparator); + String[] statements = SqlUtil.getStatements(statement); List uriList = new ArrayList<>(); for (String sql : statements) { String sqlStatement = executor.pretreatStatement(sql); diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index d23e92577e..dc729735e6 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -20,6 +20,7 @@ package org.dinky.job.builder; import org.dinky.assertion.Asserts; +import org.dinky.constant.FlinkSQLConstant; import org.dinky.data.result.IResult; import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; @@ -104,14 +105,14 @@ private List collectInserts() { } private void processWithGateway(List inserts) throws Exception { - jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); + jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts)); GatewayResult gatewayResult = submitByGateway(inserts); setJobResultFromGatewayResult(gatewayResult); } private void processWithoutGateway(List inserts) throws Exception { if (!inserts.isEmpty()) { - jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); + jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts)); TableResult tableResult = executor.executeStatementSet(inserts); updateJobWithTableResult(tableResult); }