From ee3351abd5b032dc39cdcb7496dc01b3bab5117a Mon Sep 17 00:00:00 2001 From: Zzm0809 <934230207@qq.com> Date: Tue, 3 Dec 2024 16:52:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=A4=9A=E7=A7=8D=20catalog?= =?UTF-8?q?=20=E5=88=87=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../configure/DinkyCustomConfiguration.java | 16 +++ .../dinky/service/impl/TaskServiceImpl.java | 133 +++++++++--------- .../src/main/resources/application-flyway.yml | 2 +- .../src/main/resources/application.yml | 18 ++- .../flink/catalog/DinkyPostgresCatalog.java | 1 + .../catalog/DinkyPostgresSQLCatalogTest.java} | 8 +- .../DinkyPostgresSQLCatalogFactoryTest.java} | 28 ++-- .../data/enums/CatalogTypeMappingEnum.java | 48 +++++++ 8 files changed, 163 insertions(+), 91 deletions(-) create mode 100644 dinky-admin/src/main/java/org/dinky/configure/DinkyCustomConfiguration.java rename dinky-catalog/dinky-catalog-postgres/{dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/DinkyMysqlCatalogTest.java => dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java} (89%) rename dinky-catalog/dinky-catalog-postgres/{dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/org/dinky/flink/catalog/factory/DinkyMysqlCatalogFactoryTest.java => dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java} (68%) create mode 100644 dinky-common/src/main/java/org/dinky/data/enums/CatalogTypeMappingEnum.java diff --git a/dinky-admin/src/main/java/org/dinky/configure/DinkyCustomConfiguration.java b/dinky-admin/src/main/java/org/dinky/configure/DinkyCustomConfiguration.java new file mode 100644 index 0000000000..a0505e8dbe --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/configure/DinkyCustomConfiguration.java @@ -0,0 +1,16 @@ +package org.dinky.configure; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "dinky") +@Slf4j +@Data +public class DinkyCustomConfiguration { + + + private String dbType; +} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 56f206396f..1a98b9eea9 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -19,11 +19,44 @@ package org.dinky.service.impl; -import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE; - +import cn.dev33.satoken.stp.StpUtil; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.tree.Tree; +import cn.hutool.core.lang.tree.TreeNode; +import cn.hutool.core.lang.tree.TreeUtil; +import cn.hutool.core.text.StrFormatter; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.Resource; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.util.TextUtils; import org.dinky.assertion.Asserts; import org.dinky.assertion.DinkyAssert; import org.dinky.config.Dialect; +import org.dinky.configure.DinkyCustomConfiguration; import org.dinky.constant.FlinkSQLConstant; import org.dinky.context.TenantContextHolder; import org.dinky.data.annotations.ProcessStep; @@ -33,6 +66,7 @@ import org.dinky.data.dto.TaskDTO; import org.dinky.data.dto.TaskRollbackVersionDTO; import org.dinky.data.dto.TaskSubmitDto; +import org.dinky.data.enums.CatalogTypeMappingEnum; import org.dinky.data.enums.GatewayType; import org.dinky.data.enums.JobLifeCycle; import org.dinky.data.enums.JobStatus; @@ -87,62 +121,22 @@ import org.dinky.service.SavepointsService; import org.dinky.service.TaskService; import org.dinky.service.TaskVersionService; -import org.dinky.service.UDFService; import org.dinky.service.UDFTemplateService; import org.dinky.service.UserService; import org.dinky.service.catalogue.CatalogueService; -import org.dinky.service.resource.ResourcesService; import org.dinky.service.task.BaseTask; import org.dinky.utils.FragmentVariableUtils; import org.dinky.utils.JsonUtils; import org.dinky.utils.RunTimeUtil; import org.dinky.utils.UDFUtils; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.util.TextUtils; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Base64; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; - -import javax.annotation.Resource; - +import org.jetbrains.annotations.NotNull; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import cn.dev33.satoken.stp.StpUtil; -import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.lang.tree.Tree; -import cn.hutool.core.lang.tree.TreeNode; -import cn.hutool.core.lang.tree.TreeUtil; -import cn.hutool.core.text.StrFormatter; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; +import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE; /** * TaskServiceImpl @@ -164,8 +158,8 @@ public class TaskServiceImpl extends SuperServiceImpl implemen private final DataSourceProperties dsProperties; private final UserService userService; private final ApplicationContext applicationContext; - private final UDFService udfService; - private final ResourcesService resourcesService; + private final DinkyCustomConfiguration dinkyCustomConfiguration; + @Resource @Lazy @@ -685,19 +679,7 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) { Task defaultFlinkSQLEnvTask = getTaskByNameAndTenantId(name, tenantId); - String sql = String.format( - "create catalog my_catalog with(\n " - + "'type' = 'dinky_mysql',\n" - + " 'username' = " - + "'%s',\n " - + "'password' = '%s',\n" - + " 'url' = '%s'\n" - + ")%suse catalog my_catalog%s", - dsProperties.getUsername(), - dsProperties.getPassword(), - dsProperties.getUrl(), - FlinkSQLConstant.SEPARATOR, - FlinkSQLConstant.SEPARATOR); + String sql = getStatementByCatalogType(CatalogTypeMappingEnum.ofDbType(dinkyCustomConfiguration.getDbType())); if (null != defaultFlinkSQLEnvTask) { defaultFlinkSQLEnvTask.setStatement(sql); @@ -720,6 +702,25 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) { return defaultFlinkSQLEnvTask; } + private @NotNull String getStatementByCatalogType(CatalogTypeMappingEnum catalogTypeMappingEnum) { + String sql = String.format( + "create catalog my_catalog with(\n " + + "'type' = '%s',\n" + + " 'username' = " + + "'%s',\n " + + "'password' = '%s',\n" + + " 'url' = '%s'\n" + + ")%suse catalog my_catalog%s", + catalogTypeMappingEnum.getCatalogTypeName(), + dsProperties.getUsername(), + dsProperties.getPassword(), + dsProperties.getUrl(), + FlinkSQLConstant.SEPARATOR, + FlinkSQLConstant.SEPARATOR); + log.info("Init default flink sql env sql:{}, yours dbType is:{}, catalogName is:{}", sql, catalogTypeMappingEnum.getDbType(), catalogTypeMappingEnum.getCatalogTypeName()); + return sql; + } + @Override public Task getTaskByNameAndTenantId(String name, Integer tenantId) { return baseMapper.getTaskByNameAndTenantId(name, tenantId); @@ -738,13 +739,13 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() { @Override public List getReleaseUDF() { return list(new LambdaQueryWrapper() - .in( - Task::getDialect, - Dialect.JAVA.getValue(), - Dialect.SCALA.getValue(), - Dialect.PYTHON.getValue()) - .eq(Task::getEnabled, 1) - .eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())) + .in( + Task::getDialect, + Dialect.JAVA.getValue(), + Dialect.SCALA.getValue(), + Dialect.PYTHON.getValue()) + .eq(Task::getEnabled, 1) + .eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())) .stream() .filter(task -> Asserts.isNotNullString( task.getConfigJson().getUdfConfig().getClassName())) diff --git a/dinky-admin/src/main/resources/application-flyway.yml b/dinky-admin/src/main/resources/application-flyway.yml index 50611fcbc9..808f57e24c 100644 --- a/dinky-admin/src/main/resources/application-flyway.yml +++ b/dinky-admin/src/main/resources/application-flyway.yml @@ -21,7 +21,7 @@ spring: enabled: true # Is it enabled group: true # Enable grouping locations: - - classpath:db/migration/${spring.profiles.active} + - classpath:db/migration/${dinky.db-type} table: _dinky_flyway_schema_history placeholder-replacement: false # Placeholder Replacement baseline-on-migrate: true # Baseline during migration diff --git a/dinky-admin/src/main/resources/application.yml b/dinky-admin/src/main/resources/application.yml index 0855ab1b26..e7b9e1a26e 100644 --- a/dinky-admin/src/main/resources/application.yml +++ b/dinky-admin/src/main/resources/application.yml @@ -6,17 +6,23 @@ server: port: 8888 shutdown: graceful +################################################################################################################# +################################################# Dinky Custom Config ########################################### +################################################################################################################# +dinky: + # The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2] + # If you use mysql database, please configure mysql database connection information in application-mysql.yml + # If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml + # If you use the h2 database, please configure the h2 database connection information in application-h2.yml, + # note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution + db-type: ${DB_ACTIVE:h2} #[h2,mysql,pgsql] + spring: # Dinky application name application: name: Dinky profiles: - # The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2] - # If you use mysql database, please configure mysql database connection information in application-mysql.yml - # If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml - # If you use the h2 database, please configure the h2 database connection information in application-h2.yml, - # note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution - active: ${DB_ACTIVE:h2} #[h2,mysql,pgsql] + active: ${dinky.db-type} include: - jmx - flyway diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java index c7e3f7a7b9..5a4088bb73 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/main/java/org/dinky/flink/catalog/DinkyPostgresCatalog.java @@ -129,6 +129,7 @@ protected static class ColumnType { } /** 数据库用户名 */ + @Getter private final String user; /** 数据库密码 * -- GETTER -- diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/DinkyMysqlCatalogTest.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java similarity index 89% rename from dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/DinkyMysqlCatalogTest.java rename to dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java index 2a8e91b405..d3bc16da21 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/DinkyMysqlCatalogTest.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java @@ -29,21 +29,21 @@ import org.junit.Test; @Ignore -public class DinkyMysqlCatalogTest { +public class DinkyPostgresSQLCatalogTest { protected static String url; - protected static DinkyMysqlCatalog catalog; + protected static DinkyPostgresCatalog catalog; protected static final String TEST_CATALOG_NAME = "dinky"; protected static final String TEST_USERNAME = "dinky"; - protected static final String TEST_PWD = "dinky"; + protected static final String TEST_PWD = "pgdinky123"; private TableEnvironment tableEnv; @Before public void setup() { url = "jdbc:mysql://127.0.0.1:3306/dinky?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"; - catalog = new DinkyMysqlCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD); + catalog = new DinkyPostgresCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD); this.tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); diff --git a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/org/dinky/flink/catalog/factory/DinkyMysqlCatalogFactoryTest.java b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java similarity index 68% rename from dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/org/dinky/flink/catalog/factory/DinkyMysqlCatalogFactoryTest.java rename to dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java index fda54f804d..d5676423c4 100644 --- a/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/org/dinky/flink/catalog/factory/DinkyMysqlCatalogFactoryTest.java +++ b/dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.19/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java @@ -17,13 +17,13 @@ * */ -package org.dinky.flink.catalog.org.dinky.flink.catalog.factory; +package org.dinky.flink.catalog.factory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.dinky.flink.catalog.DinkyMysqlCatalog; -import org.dinky.flink.catalog.factory.DinkyMysqlCatalogFactoryOptions; + +import org.dinky.flink.catalog.DinkyPostgresCatalog; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CommonCatalogOptions; @@ -38,10 +38,10 @@ import org.junit.Test; @Ignore -public class DinkyMysqlCatalogFactoryTest { +public class DinkyPostgresSQLCatalogFactoryTest { protected static String url; - protected static DinkyMysqlCatalog catalog; + protected static DinkyPostgresCatalog catalog; protected static final String TEST_CATALOG_NAME = "dinky"; protected static final String TEST_USERNAME = "dinky"; @@ -49,28 +49,28 @@ public class DinkyMysqlCatalogFactoryTest { @BeforeClass public static void setup() throws SQLException { - url = "jdbc:mysql://10.1.51.25:3306/dinky?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"; + url = "jdbc:postgresql://localhost:5432/dinky_logic?stringtype=unspecified"; - catalog = new DinkyMysqlCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD); + catalog = new DinkyPostgresCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD); } @Test public void test() { final Map options = new HashMap<>(); - options.put(CommonCatalogOptions.CATALOG_TYPE.key(), DinkyMysqlCatalogFactoryOptions.IDENTIFIER); - options.put(DinkyMysqlCatalogFactoryOptions.USERNAME.key(), TEST_USERNAME); - options.put(DinkyMysqlCatalogFactoryOptions.PASSWORD.key(), TEST_PWD); - options.put(DinkyMysqlCatalogFactoryOptions.URL.key(), url); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), DinkyPostgresCatalogFactoryOptions.IDENTIFIER); + options.put(DinkyPostgresCatalogFactoryOptions.USERNAME.key(), TEST_USERNAME); + options.put(DinkyPostgresCatalogFactoryOptions.PASSWORD.key(), TEST_PWD); + options.put(DinkyPostgresCatalogFactoryOptions.URL.key(), url); final Catalog actualCatalog = FactoryUtil.createCatalog( TEST_CATALOG_NAME, options, null, Thread.currentThread().getContextClassLoader()); - checkEquals(catalog, (DinkyMysqlCatalog) actualCatalog); + checkEquals(catalog, (DinkyPostgresCatalog) actualCatalog); - assertTrue(actualCatalog instanceof DinkyMysqlCatalog); + assertTrue(actualCatalog instanceof DinkyPostgresCatalog); } - private static void checkEquals(DinkyMysqlCatalog c1, DinkyMysqlCatalog c2) { + private static void checkEquals(DinkyPostgresCatalog c1, DinkyPostgresCatalog c2) { assertEquals(c1.getName(), c2.getName()); assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase()); assertEquals(c1.getUser(), c2.getUser()); diff --git a/dinky-common/src/main/java/org/dinky/data/enums/CatalogTypeMappingEnum.java b/dinky-common/src/main/java/org/dinky/data/enums/CatalogTypeMappingEnum.java new file mode 100644 index 0000000000..8e1751ca3f --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/enums/CatalogTypeMappingEnum.java @@ -0,0 +1,48 @@ +package org.dinky.data.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum CatalogTypeMappingEnum { + + DINKY_MYSQL("dinky_mysql", "mysql"), + + DINKY_POSTGRES("dinky_postgres", "pgsql"), + + DINKY_H2("dinky_mysql", "h2"), + ; + + private final String catalogTypeName; + private final String dbType; + + + public static CatalogTypeMappingEnum ofDbType(String dbType) { + for (CatalogTypeMappingEnum value : values()) { + if (value.getDbType().equals(dbType)) { + return value; + } + } + throw new IllegalArgumentException("The corresponding database type could not be found"); + } + + public static CatalogTypeMappingEnum getCatalogTypeEnum(CatalogTypeMappingEnum catalogType) { + for (CatalogTypeMappingEnum value : values()) { + if (value.getCatalogTypeName().equals(catalogType.getCatalogTypeName())) { + return value; + } + } + throw new IllegalArgumentException("The corresponding database type could not be found"); + } + + public static String getCatalogTypeValue(CatalogTypeMappingEnum catalogType) { + for (CatalogTypeMappingEnum value : values()) { + if (value.getCatalogTypeName().equals(catalogType.getCatalogTypeName())) { + return value.getDbType(); + } + } + throw new IllegalArgumentException("The corresponding database type could not be found"); + } + +}