Skip to content

Commit

Permalink
feat-support-pgsql-catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 committed Dec 3, 2024
1 parent 170a98a commit a1c8cbd
Show file tree
Hide file tree
Showing 23 changed files with 1,705 additions and 794 deletions.
1 change: 0 additions & 1 deletion dinky-admin/src/main/java/org/dinky/mapper/TaskMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
@Mapper
public interface TaskMapper extends SuperMapper<Task> {

Integer queryAllSizeByName(String name);

List<Task> queryOnLineTaskByDoneStatus(
@Param("parentIds") List<Integer> parentIds,
Expand Down
102 changes: 49 additions & 53 deletions dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,40 @@

package org.dinky.service.impl;

import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE;

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
import cn.hutool.core.text.StrFormatter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.util.TextUtils;
import org.dinky.assertion.Asserts;
import org.dinky.assertion.DinkyAssert;
import org.dinky.config.Dialect;
Expand Down Expand Up @@ -97,53 +129,14 @@
import org.dinky.utils.JsonUtils;
import org.dinky.utils.RunTimeUtil;
import org.dinky.utils.UDFUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.util.TextUtils;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Resource;

import org.jetbrains.annotations.NotNull;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
import cn.hutool.core.text.StrFormatter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import static org.dinky.data.model.SystemConfiguration.FLINK_JOB_ARCHIVE;

/**
* TaskServiceImpl
Expand Down Expand Up @@ -710,20 +703,22 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {

private @NotNull String getStatementByCatalogType(CatalogTypeMappingEnum catalogTypeMappingEnum) {
String sql = String.format(
"create catalog my_catalog with(\n "
"create catalog my_catalog_%s with(\n "
+ "'type' = '%s',\n"
+ " 'username' = "
+ "'%s',\n "
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog%s",
+ ")%suse catalog my_catalog_%s %s",
catalogTypeMappingEnum.getCatalogTypeName(),
catalogTypeMappingEnum.getCatalogTypeName(),
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
catalogTypeMappingEnum.getCatalogTypeName(),
FlinkSQLConstant.SEPARATOR);
log.info(
log.debug(
"Init default flink sql env sql:{}, yours dbType is:{}, catalogName is:{}",
sql,
catalogTypeMappingEnum.getDbType(),
Expand All @@ -749,13 +744,13 @@ public JobModelOverview getJobStreamingOrBatchModelOverview() {
@Override
public List<Task> getReleaseUDF() {
return list(new LambdaQueryWrapper<Task>()
.in(
Task::getDialect,
Dialect.JAVA.getValue(),
Dialect.SCALA.getValue(),
Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()))
.in(
Task::getDialect,
Dialect.JAVA.getValue(),
Dialect.SCALA.getValue(),
Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue()))
.stream()
.filter(task -> Asserts.isNotNullString(
task.getConfigJson().getUdfConfig().getClassName()))
Expand Down Expand Up @@ -789,7 +784,8 @@ public String getTaskAPIAddress() {

@Override
public Integer queryAllSizeByName(String name) {
return baseMapper.queryAllSizeByName(name);
Long value = baseMapper.selectCount(new LambdaQueryWrapper<Task>().likeRight(Task::getName, name + "-" ));
return Math.toIntExact(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@



-- Table structure for public.dinky_user

CREATE TABLE IF NOT EXISTS public.dinky_user_backup
(
id serial PRIMARY KEY NOT NULL,
username varchar(50) NOT NULL,
user_type int DEFAULT 1,
password varchar(50),
nickname varchar(50),
worknum varchar(50),
avatar bytea,
mobile varchar(20),
enabled boolean NOT NULL DEFAULT true,
super_admin_flag boolean DEFAULT false,
is_delete boolean NOT NULL DEFAULT false,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
COMMENT ON TABLE public.dinky_user_backup IS 'user';
COMMENT ON COLUMN public.dinky_user_backup.id IS 'id';
COMMENT ON COLUMN public.dinky_user_backup.username IS 'username';
COMMENT ON COLUMN public.dinky_user_backup.password IS 'password';
COMMENT ON COLUMN public.dinky_user_backup.nickname IS 'nickname';
COMMENT ON COLUMN public.dinky_user_backup.worknum IS 'worknum';
COMMENT ON COLUMN public.dinky_user_backup.avatar IS 'avatar';
COMMENT ON COLUMN public.dinky_user_backup.mobile IS 'mobile phone';
COMMENT ON COLUMN public.dinky_user_backup.enabled IS 'enabled';
COMMENT ON COLUMN public.dinky_user_backup.super_admin_flag IS 'is super admin(0:false,1true)';


insert into public.dinky_user_backup(id, username, user_type, password, nickname, worknum, avatar, mobile, enabled, super_admin_flag, is_delete, create_time, update_time)
select id, username, user_type, password, nickname, worknum, avatar, mobile, enabled,
case when super_admin_flag = 0 then false else true end as super_admin_flag
, is_delete, create_time, update_time
from public.dinky_user;

drop table if exists public.dinky_user;
alter table public.dinky_user_backup rename to dinky_user;
6 changes: 1 addition & 5 deletions dinky-admin/src/main/resources/mapper/TaskMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
</select>


<select id="queryAllSizeByName" resultType="java.lang.Integer">
SELECT count(*)
from dinky_task
where `name` REGEXP concat(#{name}, '-[0-9]$')
</select>


<select id="getTaskByNameAndTenantId" resultType="org.dinky.data.model.Task">
select *
Expand Down
Loading

0 comments on commit a1c8cbd

Please sign in to comment.