Skip to content

Commit

Permalink
实现多种 catalog 切换
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 committed Dec 3, 2024
1 parent b645898 commit ee3351a
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.dinky.configure;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "dinky")
@Slf4j
@Data
public class DinkyCustomConfiguration {


private String dbType;
}
133 changes: 67 additions & 66 deletions dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,44 @@

package org.dinky.service.impl;

import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE;

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
import cn.hutool.core.text.StrFormatter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.util.TextUtils;
import org.dinky.assertion.Asserts;
import org.dinky.assertion.DinkyAssert;
import org.dinky.config.Dialect;
import org.dinky.configure.DinkyCustomConfiguration;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.context.TenantContextHolder;
import org.dinky.data.annotations.ProcessStep;
Expand All @@ -33,6 +66,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.CatalogTypeMappingEnum;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.JobStatus;
Expand Down Expand Up @@ -87,62 +121,22 @@
import org.dinky.service.SavepointsService;
import org.dinky.service.TaskService;
import org.dinky.service.TaskVersionService;
import org.dinky.service.UDFService;
import org.dinky.service.UDFTemplateService;
import org.dinky.service.UserService;
import org.dinky.service.catalogue.CatalogueService;
import org.dinky.service.resource.ResourcesService;
import org.dinky.service.task.BaseTask;
import org.dinky.utils.FragmentVariableUtils;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.RunTimeUtil;
import org.dinky.utils.UDFUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.util.TextUtils;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Resource;

import org.jetbrains.annotations.NotNull;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
import cn.hutool.core.text.StrFormatter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE;

/**
* TaskServiceImpl
Expand All @@ -164,8 +158,8 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
private final DataSourceProperties dsProperties;
private final UserService userService;
private final ApplicationContext applicationContext;
private final UDFService udfService;
private final ResourcesService resourcesService;
private final DinkyCustomConfiguration dinkyCustomConfiguration;


@Resource
@Lazy
Expand Down Expand Up @@ -685,19 +679,7 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {

Task defaultFlinkSQLEnvTask = getTaskByNameAndTenantId(name, tenantId);

String sql = String.format(
"create catalog my_catalog with(\n "
+ "'type' = 'dinky_mysql',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog%s",
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
FlinkSQLConstant.SEPARATOR);
String sql = getStatementByCatalogType(CatalogTypeMappingEnum.ofDbType(dinkyCustomConfiguration.getDbType()));

if (null != defaultFlinkSQLEnvTask) {
defaultFlinkSQLEnvTask.setStatement(sql);
Expand All @@ -720,6 +702,25 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {
return defaultFlinkSQLEnvTask;
}

private @NotNull String getStatementByCatalogType(CatalogTypeMappingEnum catalogTypeMappingEnum) {
String sql = String.format(
"create catalog my_catalog with(\n "
+ "'type' = '%s',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog%s",
catalogTypeMappingEnum.getCatalogTypeName(),
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
FlinkSQLConstant.SEPARATOR);
log.info("Init default flink sql env sql:{}, yours dbType is:{}, catalogName is:{}", sql, catalogTypeMappingEnum.getDbType(), catalogTypeMappingEnum.getCatalogTypeName());
return sql;
}

@Override
public Task getTaskByNameAndTenantId(String name, Integer tenantId) {
return baseMapper.getTaskByNameAndTenantId(name, tenantId);
Expand All @@ -738,13 +739,13 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() {
@Override
public List<Task> getReleaseUDF() {
return list(new LambdaQueryWrapper<Task>()
.in(
Task::getDialect,
Dialect.JAVA.getValue(),
Dialect.SCALA.getValue(),
Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()))
.in(
Task::getDialect,
Dialect.JAVA.getValue(),
Dialect.SCALA.getValue(),
Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()))
.stream()
.filter(task -> Asserts.isNotNullString(
task.getConfigJson().getUdfConfig().getClassName()))
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/resources/application-flyway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spring:
enabled: true # Is it enabled
group: true # Enable grouping
locations:
- classpath:db/migration/${spring.profiles.active}
- classpath:db/migration/${dinky.db-type}
table: _dinky_flyway_schema_history
placeholder-replacement: false # Placeholder Replacement
baseline-on-migrate: true # Baseline during migration
Expand Down
18 changes: 12 additions & 6 deletions dinky-admin/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ server:
port: 8888
shutdown: graceful

#################################################################################################################
################################################# Dinky Custom Config ###########################################
#################################################################################################################
dinky:
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]
# If you use mysql database, please configure mysql database connection information in application-mysql.yml
# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml
# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,
# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution
db-type: ${DB_ACTIVE:h2} #[h2,mysql,pgsql]

spring:
# Dinky application name
application:
name: Dinky
profiles:
# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]
# If you use mysql database, please configure mysql database connection information in application-mysql.yml
# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml
# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,
# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with caution
active: ${DB_ACTIVE:h2} #[h2,mysql,pgsql]
active: ${dinky.db-type}
include:
- jmx
- flyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected static class ColumnType {
}

/** 数据库用户名 */
@Getter
private final String user;
/** 数据库密码
* -- GETTER --
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@
import org.junit.Test;

@Ignore
public class DinkyMysqlCatalogTest {
public class DinkyPostgresSQLCatalogTest {

protected static String url;
protected static DinkyMysqlCatalog catalog;
protected static DinkyPostgresCatalog catalog;

protected static final String TEST_CATALOG_NAME = "dinky";
protected static final String TEST_USERNAME = "dinky";
protected static final String TEST_PWD = "dinky";
protected static final String TEST_PWD = "pgdinky123";

private TableEnvironment tableEnv;

@Before
public void setup() {
url = "jdbc:mysql://127.0.0.1:3306/dinky?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC";
catalog = new DinkyMysqlCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD);
catalog = new DinkyPostgresCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD);

this.tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*
*/

package org.dinky.flink.catalog.org.dinky.flink.catalog.factory;
package org.dinky.flink.catalog.factory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.dinky.flink.catalog.DinkyMysqlCatalog;
import org.dinky.flink.catalog.factory.DinkyMysqlCatalogFactoryOptions;

import org.dinky.flink.catalog.DinkyPostgresCatalog;

import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
Expand All @@ -38,39 +38,39 @@
import org.junit.Test;

@Ignore
public class DinkyMysqlCatalogFactoryTest {
public class DinkyPostgresSQLCatalogFactoryTest {

protected static String url;
protected static DinkyMysqlCatalog catalog;
protected static DinkyPostgresCatalog catalog;

protected static final String TEST_CATALOG_NAME = "dinky";
protected static final String TEST_USERNAME = "dinky";
protected static final String TEST_PWD = "dinky";

@BeforeClass
public static void setup() throws SQLException {
url = "jdbc:mysql://10.1.51.25:3306/dinky?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC";
url = "jdbc:postgresql://localhost:5432/dinky_logic?stringtype=unspecified";

catalog = new DinkyMysqlCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD);
catalog = new DinkyPostgresCatalog(TEST_CATALOG_NAME, url, TEST_USERNAME, TEST_PWD);
}

@Test
public void test() {
final Map<String, String> options = new HashMap<>();
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), DinkyMysqlCatalogFactoryOptions.IDENTIFIER);
options.put(DinkyMysqlCatalogFactoryOptions.USERNAME.key(), TEST_USERNAME);
options.put(DinkyMysqlCatalogFactoryOptions.PASSWORD.key(), TEST_PWD);
options.put(DinkyMysqlCatalogFactoryOptions.URL.key(), url);
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), DinkyPostgresCatalogFactoryOptions.IDENTIFIER);
options.put(DinkyPostgresCatalogFactoryOptions.USERNAME.key(), TEST_USERNAME);
options.put(DinkyPostgresCatalogFactoryOptions.PASSWORD.key(), TEST_PWD);
options.put(DinkyPostgresCatalogFactoryOptions.URL.key(), url);

final Catalog actualCatalog = FactoryUtil.createCatalog(
TEST_CATALOG_NAME, options, null, Thread.currentThread().getContextClassLoader());

checkEquals(catalog, (DinkyMysqlCatalog) actualCatalog);
checkEquals(catalog, (DinkyPostgresCatalog) actualCatalog);

assertTrue(actualCatalog instanceof DinkyMysqlCatalog);
assertTrue(actualCatalog instanceof DinkyPostgresCatalog);
}

private static void checkEquals(DinkyMysqlCatalog c1, DinkyMysqlCatalog c2) {
private static void checkEquals(DinkyPostgresCatalog c1, DinkyPostgresCatalog c2) {
assertEquals(c1.getName(), c2.getName());
assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase());
assertEquals(c1.getUser(), c2.getUser());
Expand Down
Loading

0 comments on commit ee3351a

Please sign in to comment.