From a1c8cbdbb8ac0b835e6629acaac33b78f3f5b93c Mon Sep 17 00:00:00 2001 From: Zzm0809 <934230207@qq.com> Date: Tue, 3 Dec 2024 19:00:40 +0800 Subject: [PATCH] feat-support-pgsql-catalog --- .../java/org/dinky/mapper/TaskMapper.java | 1 - .../dinky/service/impl/TaskServiceImpl.java | 102 ++++----- .../pgsql/V20241203.1.2.0__release.sql | 41 ++++ .../src/main/resources/mapper/TaskMapper.xml | 6 +- .../flink/catalog/DinkyPostgresCatalog.java | 214 +++++++++--------- .../catalog/DinkyPostgresSQLCatalogTest.java | 63 ++++++ .../DinkyPostgresSQLCatalogFactoryTest.java | 79 +++++++ .../flink/catalog/DinkyPostgresCatalog.java | 214 +++++++++--------- .../catalog/DinkyPostgresSQLCatalogTest.java | 63 ++++++ .../DinkyPostgresSQLCatalogFactoryTest.java | 79 +++++++ .../flink/catalog/DinkyPostgresCatalog.java | 214 +++++++++--------- .../catalog/DinkyPostgresSQLCatalogTest.java | 63 ++++++ .../DinkyPostgresSQLCatalogFactoryTest.java | 79 +++++++ .../flink/catalog/DinkyPostgresCatalog.java | 214 +++++++++--------- .../catalog/DinkyPostgresSQLCatalogTest.java | 63 ++++++ .../DinkyPostgresSQLCatalogFactoryTest.java | 79 +++++++ .../flink/catalog/DinkyPostgresCatalog.java | 214 +++++++++--------- .../catalog/DinkyPostgresSQLCatalogTest.java | 63 ++++++ .../DinkyPostgresSQLCatalogFactoryTest.java | 79 +++++++ .../flink/catalog/DinkyPostgresCatalog.java | 213 ++++++++--------- .../flink/catalog/DinkyPostgresCatalog.java | 214 +++++++++--------- .../catalog/DinkyPostgresSQLCatalogTest.java | 63 ++++++ .../DinkyPostgresSQLCatalogFactoryTest.java | 79 +++++++ 23 files changed, 1705 insertions(+), 794 deletions(-) create mode 100644 dinky-admin/src/main/resources/db/migration/pgsql/V20241203.1.2.0__release.sql create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.14/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.15/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.15/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.16/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.16/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.17/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.17/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.18/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.18/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.20/src/test/java/org/dinky/flink/catalog/DinkyPostgresSQLCatalogTest.java create mode 100644 dinky-catalog/dinky-catalog-postgres/dinky-catalog-postgres-1.20/src/test/java/org/dinky/flink/catalog/factory/DinkyPostgresSQLCatalogFactoryTest.java diff --git a/dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java index 5c6b32b33e..caadfe5e7a 100644 --- a/dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java +++ b/dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java @@ -39,7 +39,6 @@ @Mapper public interface TaskMapper extends SuperMapper { - Integer queryAllSizeByName(String name); List queryOnLineTaskByDoneStatus( @Param("parentIds") List parentIds, diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index c35baf633e..f0dca4f9c7 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -19,8 +19,40 @@ package org.dinky.service.impl; -import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE; - +import cn.dev33.satoken.stp.StpUtil; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.tree.Tree; +import cn.hutool.core.lang.tree.TreeNode; +import cn.hutool.core.lang.tree.TreeUtil; +import cn.hutool.core.text.StrFormatter; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.Resource; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.util.TextUtils; import org.dinky.assertion.Asserts; import org.dinky.assertion.DinkyAssert; import org.dinky.config.Dialect; @@ -97,28 +129,6 @@ import org.dinky.utils.JsonUtils; import org.dinky.utils.RunTimeUtil; import org.dinky.utils.UDFUtils; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.util.TextUtils; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Base64; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; - -import javax.annotation.Resource; - import org.jetbrains.annotations.NotNull; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.context.ApplicationContext; @@ -126,24 +136,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; - -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import cn.dev33.satoken.stp.StpUtil; -import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.lang.tree.Tree; -import cn.hutool.core.lang.tree.TreeNode; -import cn.hutool.core.lang.tree.TreeUtil; -import cn.hutool.core.text.StrFormatter; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; +import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE; /** * TaskServiceImpl @@ -710,20 +703,22 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) { private @NotNull String getStatementByCatalogType(CatalogTypeMappingEnum catalogTypeMappingEnum) { String sql = String.format( - "create catalog my_catalog with(\n " + "create catalog my_catalog_%s with(\n " + "'type' = '%s',\n" + " 'username' = " + "'%s',\n " + "'password' = '%s',\n" + " 'url' = '%s'\n" - + ")%suse catalog my_catalog%s", + + ")%suse catalog my_catalog_%s %s", + catalogTypeMappingEnum.getCatalogTypeName(), catalogTypeMappingEnum.getCatalogTypeName(), dsProperties.getUsername(), dsProperties.getPassword(), dsProperties.getUrl(), FlinkSQLConstant.SEPARATOR, + catalogTypeMappingEnum.getCatalogTypeName(), FlinkSQLConstant.SEPARATOR); - log.info( + log.debug( "Init default flink sql env sql:{}, yours dbType is:{}, catalogName is:{}", sql, catalogTypeMappingEnum.getDbType(), @@ -749,13 +744,13 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() { @Override public List getReleaseUDF() { return list(new LambdaQueryWrapper() - .in( - Task::getDialect, - Dialect.JAVA.getValue(), - Dialect.SCALA.getValue(), - Dialect.PYTHON.getValue()) - .eq(Task::getEnabled, 1) - .eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())) + .in( + Task::getDialect, + Dialect.JAVA.getValue(), + Dialect.SCALA.getValue(), + Dialect.PYTHON.getValue()) + .eq(Task::getEnabled, 1) + .eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())) .stream() .filter(task -> Asserts.isNotNullString( task.getConfigJson().getUdfConfig().getClassName())) @@ -789,7 +784,8 @@ public String getTaskAPIAddress() { @Override public Integer queryAllSizeByName(String name) { - return baseMapper.queryAllSizeByName(name); + Long value = baseMapper.selectCount(new LambdaQueryWrapper().likeRight(Task::getName, name + "-" )); + return Math.toIntExact(value); } @Override diff --git a/dinky-admin/src/main/resources/db/migration/pgsql/V20241203.1.2.0__release.sql b/dinky-admin/src/main/resources/db/migration/pgsql/V20241203.1.2.0__release.sql new file mode 100644 index 0000000000..0e45e82425 --- /dev/null +++ b/dinky-admin/src/main/resources/db/migration/pgsql/V20241203.1.2.0__release.sql @@ -0,0 +1,41 @@ + + + +-- Table structure for public.dinky_user + +CREATE TABLE IF NOT EXISTS public.dinky_user_backup +( + id serial PRIMARY KEY NOT NULL, + username varchar(50) NOT NULL, + user_type int DEFAULT 1, + password varchar(50), + nickname varchar(50), + worknum varchar(50), + avatar bytea, + mobile varchar(20), + enabled boolean NOT NULL DEFAULT true, + super_admin_flag boolean DEFAULT false, + is_delete boolean NOT NULL DEFAULT false, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE public.dinky_user_backup IS 'user'; +COMMENT ON COLUMN public.dinky_user_backup.id IS 'id'; +COMMENT ON COLUMN public.dinky_user_backup.username IS 'username'; +COMMENT ON COLUMN public.dinky_user_backup.password IS 'password'; +COMMENT ON COLUMN public.dinky_user_backup.nickname IS 'nickname'; +COMMENT ON COLUMN public.dinky_user_backup.worknum IS 'worknum'; +COMMENT ON COLUMN public.dinky_user_backup.avatar IS 'avatar'; +COMMENT ON COLUMN public.dinky_user_backup.mobile IS 'mobile phone'; +COMMENT ON COLUMN public.dinky_user_backup.enabled IS 'enabled'; +COMMENT ON COLUMN public.dinky_user_backup.super_admin_flag IS 'is super admin(0:false,1true)'; + + +insert into public.dinky_user_backup(id, username, user_type, password, nickname, worknum, avatar, mobile, enabled, super_admin_flag, is_delete, create_time, update_time) +select id, username, user_type, password, nickname, worknum, avatar, mobile, enabled, + case when super_admin_flag = 0 then false else true end as super_admin_flag + , is_delete, create_time, update_time +from public.dinky_user; + +drop table if exists public.dinky_user; +alter table public.dinky_user_backup rename to dinky_user; diff --git a/dinky-admin/src/main/resources/mapper/TaskMapper.xml b/dinky-admin/src/main/resources/mapper/TaskMapper.xml index 4f1f731d7e..031633ab65 100644 --- a/dinky-admin/src/main/resources/mapper/TaskMapper.xml +++ b/dinky-admin/src/main/resources/mapper/TaskMapper.xml @@ -46,11 +46,7 @@ - +