diff --git a/.github/workflows/e2e_test.yml b/.github/workflows/e2e_test.yml
new file mode 100644
index 0000000000..f3e0b82930
--- /dev/null
+++ b/.github/workflows/e2e_test.yml
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: E2E Test
+
+on:
+ workflow_dispatch:
+
+jobs:
+ init_env:
+ name: init env
+ runs-on: ubuntu-latest
+ services:
+ registry:
+ image: registry:2
+ ports:
+ - 5000:5000
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ - name: Init Env Jar
+ run: |
+ wget -O e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar &&
+ wget -O e2e_test/docker-compose-env/flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
+ - name: Init Docker Network
+ run: |
+ docker network create -d bridge dinky_net
+ - name: Init Run Docker MySQL
+ uses: hoverkraft-tech/compose-action@v2.0.2
+ with:
+ compose-file: ./e2e_test/docker-compose-env/mysql/docker-compose.yml
+ - name: Init Run Docker Hadoop
+ uses: hoverkraft-tech/compose-action@v2.0.2
+ with:
+ compose-file: ./e2e_test/docker-compose-env/hadoop/docker-compose.yml
+ # 设置 QEMU, 后面 docker buildx 依赖此.
+ - name: Init Run Docker Flink
+ uses: hoverkraft-tech/compose-action@v2.0.2
+ with:
+ compose-file: ./e2e_test/docker-compose-env/flink/docker-compose.yml
+ # 设置 QEMU, 后面 docker buildx 依赖此.
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v3
+ # 设置 Docker buildx, 方便构建 Multi platform 镜像
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+ with:
+ driver-opts: network=host
+ - name: Build Dinky Image
+ uses: docker/build-push-action@v5
+ with:
+ file: ./deploy/docker/Dockerfile
+ # 是否 docker push
+ push: true
+ build-args: |
+ FLINK_VERSION=1.14
+ DINKY_VERSION=1
+ tags: |
+ localhost:5000/dinky/dinky-test:flink-1.14
+ - name: Init Run Docker Dinky
+ uses: hoverkraft-tech/compose-action@v2.0.2
+ with:
+ compose-file: ./e2e_test/docker-compose-env/dinky/docker-compose.yml
+ - name: Run Docker Python Script
+# uses: hoverkraft-tech/compose-action@v2.0.2
+# with:
+# compose-file: ./e2e_test/tools/docker-compose.yml
+ run: |
+ sleep 30 && cd ./e2e_test/tools && docker run -v ./:/app -w /app --net dinky_net --rm --entrypoint /bin/bash python:3.9 -c 'pip install -r requirements.txt && python main.py dinky14:8888'
diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile
index afa40b4379..d51347236c 100755
--- a/deploy/docker/Dockerfile
+++ b/deploy/docker/Dockerfile
@@ -27,7 +27,7 @@ COPY --from=ui-build /build/dist/ /build/dinky-web/dist/
RUN mvn package -Dmaven.test.skip=true -P prod,flink-single-version,flink-${FLINK_VERSION},fast
RUN mkdir release && \
- tar -C release -xvf build/dinky-release-${FLINK_VERSION}-${DINKY_VERSION}.tar.gz && \
+ tar -C release -xvf build/dinky-release-${FLINK_VERSION}-*.tar.gz && \
mv release/dinky-release-* release/dinky
@@ -48,8 +48,8 @@ RUN rm -f /opt/dinky/extends/flink${FLINK_VERSION}/flink/flink-table-planner-lo
COPY --from=flink-base /opt/flink/opt/flink-table-planner*.jar /opt/dinky/extends/flink${FLINK_VERSION}/flink/
-RUN mkdir /opt/dinky/customJar && chmod -R 777 /opt/dinky/ && sed -i 's/-Xms512M -Xmx2048M -XX:PermSize=512M/-XX:+UseContainerSupport -XX:InitialRAMPercentage=70.0 -XX:MaxRAMPercentage=70.0/g' auto.sh
+RUN mkdir /opt/dinky/customJar && chmod -R 777 /opt/dinky/ && sed -i 's/-Xms512M -Xmx2048M -XX:PermSize=512M/-XX:+UseContainerSupport -XX:InitialRAMPercentage=70.0 -XX:MaxRAMPercentage=70.0/g' ./bin/auto.sh
EXPOSE 8888
-CMD ./bin/auto.sh startOnPending
\ No newline at end of file
+CMD ./bin/auto.sh startOnPending
diff --git a/dinky-admin/src/main/java/org/dinky/Dinky.java b/dinky-admin/src/main/java/org/dinky/Dinky.java
index 61778416c8..4f717bb4a5 100644
--- a/dinky-admin/src/main/java/org/dinky/Dinky.java
+++ b/dinky-admin/src/main/java/org/dinky/Dinky.java
@@ -19,6 +19,7 @@
package org.dinky;
+import org.dinky.data.constant.DirConstant;
import org.dinky.security.NoExitSecurityManager;
import org.springframework.boot.SpringApplication;
@@ -29,6 +30,7 @@
import com.alibaba.druid.proxy.DruidDriver;
+import cn.hutool.core.io.FileUtil;
import lombok.SneakyThrows;
/**
@@ -54,6 +56,8 @@ public static void main(String[] args) {
// chinese: 初始化JDBC Driver,因为包的数量特别庞大,所以这里需要异步执行,并提前加载Driver
new Thread(DruidDriver::getInstance).start();
+ FileUtil.mkdir(DirConstant.getTempRootDir());
+
SpringApplication app = new SpringApplication(Dinky.class);
app.run(args);
}
diff --git a/e2e_test/docker-compose-env/dinky/docker-compose.yml b/e2e_test/docker-compose-env/dinky/docker-compose.yml
new file mode 100644
index 0000000000..f73d246fde
--- /dev/null
+++ b/e2e_test/docker-compose-env/dinky/docker-compose.yml
@@ -0,0 +1,19 @@
+version: "3"
+networks:
+ dinky_net:
+ external: true
+services:
+ dinky14:
+ restart: always
+ image: localhost:5000/dinky/dinky-test:flink-1.14
+ environment:
+ - DB_ACTIVE=mysql
+ - MYSQL_ADDR=mysql:3306
+ - MYSQL_DATABASE=dinky
+ - MYSQL_USERNAME=root
+ - MYSQL_PASSWORD=dinky
+ volumes:
+ - ./mysql-connector-java-8.0.30.jar:/opt/dinky/lib/mysql-connector-java-8.0.30.jar
+ - ../flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/dinky/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
+ networks:
+ - dinky_net
diff --git a/e2e_test/docker-compose-env/flink/conf/core-site.xml b/e2e_test/docker-compose-env/flink/conf/core-site.xml
new file mode 100644
index 0000000000..12857664ca
--- /dev/null
+++ b/e2e_test/docker-compose-env/flink/conf/core-site.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+ hadoop.proxyuser.hue.hosts*
+ fs.defaultFShdfs://namenode:9000
+ hadoop.http.staticuser.userroot
+ io.compression.codecsorg.apache.hadoop.io.compress.SnappyCodec
+ hadoop.proxyuser.hue.groups*
+
\ No newline at end of file
diff --git a/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml b/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml
new file mode 100644
index 0000000000..597f641cab
--- /dev/null
+++ b/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml
@@ -0,0 +1,279 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 16000m
+
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 17280m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 20
+
+# taskmanager.cpu.cores: 1
+# taskmanager.memory.task.heap.size: 1200m
+# taskmanager.memory.managed.size: 0
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 64mb
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper stores
+# the small ground truth for checkpoint and leader election, this location stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.
+#
+# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details.
+#
+# execution.checkpointing.interval: 3min
+# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
+# execution.checkpointing.max-concurrent-checkpoints: 1
+# execution.checkpointing.min-pause: 0
+# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+# execution.checkpointing.timeout: 10min
+# execution.checkpointing.tolerable-failed-checkpoints: 0
+# execution.checkpointing.unaligned: false
+#
+# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
+# .
+#
+# state.backend: filesystem
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task failures.
+# Only restart tasks that may have been affected by the task failure, which typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+rest.port: 8282
+
+# The address to which the REST client will connect to
+#
+#rest.address: 0.0.0.0
+
+# The address that the REST & web server binds to
+#
+#rest.bind-address: 0.0.0.0
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+# Flag to specify whether job cancellation is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.cancel.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+classloader.resolve-order: parent-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+#historyserver.web.port: 8082
+
+# Comma separated list of directories to monitor for completed jobs.
+#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+
+# Interval in milliseconds for refreshing the monitored directories.
+#historyserver.archive.fs.refresh-interval: 10000
+
+blob.server.port: 6124
+query.server.port: 6125
+
+jobmanager.rpc.address: jobmanager
+env.java.opts: "-Dfile.encoding=UTF-8"
diff --git a/e2e_test/docker-compose-env/flink/conf/hdfs-site.xml b/e2e_test/docker-compose-env/flink/conf/hdfs-site.xml
new file mode 100644
index 0000000000..3cf9162f08
--- /dev/null
+++ b/e2e_test/docker-compose-env/flink/conf/hdfs-site.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+
+ dfs.namenode.datanode.registration.ip-hostname-checkfalse
+ dfs.webhdfs.enabledtrue
+ dfs.permissions.enabledfalse
+ dfs.namenode.name.dirfile:///hadoop/dfs/name
+ dfs.namenode.rpc-bind-host0.0.0.0
+ dfs.namenode.servicerpc-bind-host0.0.0.0
+ dfs.namenode.http-bind-host0.0.0.0
+ dfs.namenode.https-bind-host0.0.0.0
+ dfs.client.use.datanode.hostnametrue
+ dfs.datanode.use.datanode.hostnametrue
+
\ No newline at end of file
diff --git a/e2e_test/docker-compose-env/flink/conf/log4j-console.properties b/e2e_test/docker-compose-env/flink/conf/log4j-console.properties
new file mode 100644
index 0000000000..51353e4d4b
--- /dev/null
+++ b/e2e_test/docker-compose-env/flink/conf/log4j-console.properties
@@ -0,0 +1,68 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+rootLogger.appenderRef.rolling.ref = RollingFileAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Log all infos in the given rolling file
+appender.rolling.name = RollingFileAppender
+appender.rolling.type = RollingFile
+appender.rolling.append = true
+appender.rolling.fileName = ${sys:log.file}
+appender.rolling.filePattern = ${sys:log.file}.%i
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100MB
+appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
diff --git a/e2e_test/docker-compose-env/flink/conf/yarn-site.xml b/e2e_test/docker-compose-env/flink/conf/yarn-site.xml
new file mode 100644
index 0000000000..dbf5785e4e
--- /dev/null
+++ b/e2e_test/docker-compose-env/flink/conf/yarn-site.xml
@@ -0,0 +1,46 @@
+
+
+
+
+
+
+ yarn.timeline-service.enabledtrue
+ yarn.scheduler.capacity.root.default.maximum-allocation-vcores4
+ yarn.resourcemanager.system-metrics-publisher.enabledtrue
+ yarn.resourcemanager.store.classorg.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+ yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage98.5
+ yarn.log.server.urlhttp://historyserver:8188/applicationhistory/logs/
+ yarn.resourcemanager.fs.state-store.uri/rmstate
+ yarn.timeline-service.generic-application-history.enabledtrue
+ yarn.log-aggregation-enabletrue
+ yarn.resourcemanager.hostnameresourcemanager
+ yarn.scheduler.capacity.root.default.maximum-allocation-mb8192
+ yarn.nodemanager.aux-servicesmapreduce_shuffle
+ yarn.resourcemanager.resource_tracker.addressresourcemanager:8031
+ yarn.timeline-service.hostnamehistoryserver
+ yarn.resourcemanager.scheduler.addressresourcemanager:8030
+ yarn.resourcemanager.addressresourcemanager:8032
+ mapred.map.output.compress.codecorg.apache.hadoop.io.compress.SnappyCodec
+ yarn.nodemanager.remote-app-log-dir/app-logs
+ yarn.resourcemanager.scheduler.classorg.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
+ mapreduce.map.output.compresstrue
+ yarn.nodemanager.resource.memory-mb16384
+ yarn.resourcemanager.recovery.enabledtrue
+ yarn.nodemanager.resource.cpu-vcores8
+ yarn.resourcemanager.bind-host0.0.0.0
+ yarn.nodemanager.bind-host0.0.0.0
+ yarn.nodemanager.bind-host0.0.0.0
+ yarn.webapp.ui2.enabletrue
+
diff --git a/e2e_test/docker-compose-env/flink/docker-compose.yml b/e2e_test/docker-compose-env/flink/docker-compose.yml
new file mode 100644
index 0000000000..4ca9864718
--- /dev/null
+++ b/e2e_test/docker-compose-env/flink/docker-compose.yml
@@ -0,0 +1,26 @@
+version: "3"
+networks:
+ dinky_net:
+ external: true
+services:
+ jobmanager:
+ restart: always
+ image: flink:1.14.6
+ command: jobmanager
+ environment:
+ - HADOOP_CONF_DIR=/opt/flink/conf
+ volumes:
+ - ./conf:/opt/flink/conf
+ - ./flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
+ networks:
+ - dinky_net
+ taskmanager:
+ image: flink:1.14.6
+ command: taskmanager
+ environment:
+ - HADOOP_CONF_DIR=/opt/flink/conf
+ volumes:
+ - ./conf:/opt/flink/conf
+ - ./flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
+ networks:
+ - dinky_net
diff --git a/e2e_test/docker-compose-env/hadoop/core-site.xml b/e2e_test/docker-compose-env/hadoop/core-site.xml
new file mode 100644
index 0000000000..12857664ca
--- /dev/null
+++ b/e2e_test/docker-compose-env/hadoop/core-site.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+ hadoop.proxyuser.hue.hosts*
+ fs.defaultFShdfs://namenode:9000
+ hadoop.http.staticuser.userroot
+ io.compression.codecsorg.apache.hadoop.io.compress.SnappyCodec
+ hadoop.proxyuser.hue.groups*
+
\ No newline at end of file
diff --git a/e2e_test/docker-compose-env/hadoop/docker-compose.yml b/e2e_test/docker-compose-env/hadoop/docker-compose.yml
new file mode 100644
index 0000000000..2947e37ccf
--- /dev/null
+++ b/e2e_test/docker-compose-env/hadoop/docker-compose.yml
@@ -0,0 +1,94 @@
+version: "3"
+networks:
+ dinky_net:
+ external: true
+
+services:
+ namenode:
+ image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
+ container_name: namenode
+ hostname: namenode
+ restart: always
+ ports:
+ - 9870:9870
+ - 9000:9000
+ volumes:
+ - ./hadoop-3.2.1/dfs/name:/hadoop/dfs/name
+ environment:
+ - CLUSTER_NAME=test
+ env_file:
+ - hadoop.env
+ networks:
+ - dinky_net
+
+ datanode:
+ image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
+ container_name: datanode
+ hostname: datanode
+ restart: always
+ depends_on:
+ - namenode
+ environment:
+ SERVICE_PRECONDITION: "namenode:9870"
+ env_file:
+ - hadoop.env
+ ports:
+ - 9864:9864
+ - 9866:9866
+ networks:
+ - dinky_net
+ resourcemanager:
+ image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
+ container_name: resourcemanager
+ hostname: resourcemanager
+ restart: always
+ environment:
+ SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864"
+ env_file:
+ - hadoop.env
+ volumes:
+ - ./yarn-site.xml:/opt/hadoop-3.2.1/etc/hadoop/yarn-site.xml
+ networks:
+ - dinky_net
+
+
+ nodemanager:
+ image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8
+ container_name: nodemanager
+ hostname: nodemanager
+ restart: always
+ environment:
+ SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088"
+ # JAVA_HOME: /opt/java11
+ # PYTHON_HOME: /opt/miniconda3
+ # PATH: $PYTHON_HOME/bin:$PATH
+ env_file:
+ - hadoop.env
+ # volumes:
+ # - /opt/jdk11/:/opt/java11
+ # - ./coda:/opt/coda
+ # - ./miniconda.sh:/root/miniconda3.sh
+ ## - /usr/bin/python3.9:/usr/bin/python
+ ## - /usr/local/lib/python3.9/site-packages:/usr/local/lib/python3.9/site-packages
+ # - /opt/jdk11/activation-1.1.1.jar:/opt/hadoop-3.2.1/share/hadoop/yarn/lib/activation-1.1.1.jar
+ networks:
+ - dinky_net
+
+ historyserver:
+ image: bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8
+ container_name: historyserver
+ hostname: historyserver
+ restart: always
+ environment:
+ SERVICE_PRECONDITION: "namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088"
+ env_file:
+ - hadoop.env
+ ports:
+ - 8188:8188
+ - 10200:10200
+ networks:
+ - dinky_net
+# extra_hosts:
+# - resourcemanager:10.13.8.216
+# - nodemanager:10.13.8.216
+
diff --git a/e2e_test/docker-compose-env/hadoop/hadoop.env b/e2e_test/docker-compose-env/hadoop/hadoop.env
new file mode 100644
index 0000000000..fe34f877a4
--- /dev/null
+++ b/e2e_test/docker-compose-env/hadoop/hadoop.env
@@ -0,0 +1,43 @@
+CORE_CONF_fs_defaultFS=hdfs://namenode:9000
+CORE_CONF_hadoop_http_staticuser_user=root
+CORE_CONF_hadoop_proxyuser_hue_hosts=*
+CORE_CONF_hadoop_proxyuser_hue_groups=*
+CORE_CONF_io_compression_codecs=org.apache.hadoop.io.compress.SnappyCodec
+
+HDFS_CONF_dfs_webhdfs_enabled=true
+HDFS_CONF_dfs_permissions_enabled=false
+HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
+
+YARN_CONF_yarn_log___aggregation___enable=true
+YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
+YARN_CONF_yarn_resourcemanager_recovery_enabled=true
+YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+YARN_CONF_yarn_resourcemanager_scheduler_class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
+YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___mb=8192
+YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___vcores=4
+YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
+YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
+YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
+YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
+YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
+YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031
+YARN_CONF_yarn_timeline___service_enabled=true
+YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
+YARN_CONF_yarn_timeline___service_hostname=historyserver
+YARN_CONF_mapreduce_map_output_compress=true
+YARN_CONF_mapred_map_output_compress_codec=org.apache.hadoop.io.compress.SnappyCodec
+YARN_CONF_yarn_nodemanager_resource_memory___mb=16384
+YARN_CONF_yarn_nodemanager_resource_cpu___vcores=8
+YARN_CONF_yarn_nodemanager_disk___health___checker_max___disk___utilization___per___disk___percentage=98.5
+YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
+YARN_CONF_yarn_nodemanager_aux___services=mapreduce_shuffle
+
+MAPRED_CONF_mapreduce_framework_name=yarn
+MAPRED_CONF_mapred_child_java_opts=-Xmx4096m
+MAPRED_CONF_mapreduce_map_memory_mb=4096
+MAPRED_CONF_mapreduce_reduce_memory_mb=8192
+MAPRED_CONF_mapreduce_map_java_opts=-Xmx3072m
+MAPRED_CONF_mapreduce_reduce_java_opts=-Xmx6144m
+MAPRED_CONF_yarn_app_mapreduce_am_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/
+MAPRED_CONF_mapreduce_map_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/
+MAPRED_CONF_mapreduce_reduce_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/
\ No newline at end of file
diff --git a/e2e_test/docker-compose-env/hadoop/hdfs-site.xml b/e2e_test/docker-compose-env/hadoop/hdfs-site.xml
new file mode 100644
index 0000000000..3cf9162f08
--- /dev/null
+++ b/e2e_test/docker-compose-env/hadoop/hdfs-site.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+
+ dfs.namenode.datanode.registration.ip-hostname-checkfalse
+ dfs.webhdfs.enabledtrue
+ dfs.permissions.enabledfalse
+ dfs.namenode.name.dirfile:///hadoop/dfs/name
+ dfs.namenode.rpc-bind-host0.0.0.0
+ dfs.namenode.servicerpc-bind-host0.0.0.0
+ dfs.namenode.http-bind-host0.0.0.0
+ dfs.namenode.https-bind-host0.0.0.0
+ dfs.client.use.datanode.hostnametrue
+ dfs.datanode.use.datanode.hostnametrue
+
\ No newline at end of file
diff --git a/e2e_test/docker-compose-env/hadoop/yarn-site.xml b/e2e_test/docker-compose-env/hadoop/yarn-site.xml
new file mode 100644
index 0000000000..dbf5785e4e
--- /dev/null
+++ b/e2e_test/docker-compose-env/hadoop/yarn-site.xml
@@ -0,0 +1,46 @@
+
+
+
+
+
+
+ yarn.timeline-service.enabledtrue
+ yarn.scheduler.capacity.root.default.maximum-allocation-vcores4
+ yarn.resourcemanager.system-metrics-publisher.enabledtrue
+ yarn.resourcemanager.store.classorg.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+ yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage98.5
+ yarn.log.server.urlhttp://historyserver:8188/applicationhistory/logs/
+ yarn.resourcemanager.fs.state-store.uri/rmstate
+ yarn.timeline-service.generic-application-history.enabledtrue
+ yarn.log-aggregation-enabletrue
+ yarn.resourcemanager.hostnameresourcemanager
+ yarn.scheduler.capacity.root.default.maximum-allocation-mb8192
+ yarn.nodemanager.aux-servicesmapreduce_shuffle
+ yarn.resourcemanager.resource_tracker.addressresourcemanager:8031
+ yarn.timeline-service.hostnamehistoryserver
+ yarn.resourcemanager.scheduler.addressresourcemanager:8030
+ yarn.resourcemanager.addressresourcemanager:8032
+ mapred.map.output.compress.codecorg.apache.hadoop.io.compress.SnappyCodec
+ yarn.nodemanager.remote-app-log-dir/app-logs
+ yarn.resourcemanager.scheduler.classorg.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
+ mapreduce.map.output.compresstrue
+ yarn.nodemanager.resource.memory-mb16384
+ yarn.resourcemanager.recovery.enabledtrue
+ yarn.nodemanager.resource.cpu-vcores8
+ yarn.resourcemanager.bind-host0.0.0.0
+ yarn.nodemanager.bind-host0.0.0.0
+ yarn.nodemanager.bind-host0.0.0.0
+ yarn.webapp.ui2.enabletrue
+
diff --git a/e2e_test/docker-compose-env/mysql/conf/conf.d/my.cnf b/e2e_test/docker-compose-env/mysql/conf/conf.d/my.cnf
new file mode 100644
index 0000000000..14095b4c53
--- /dev/null
+++ b/e2e_test/docker-compose-env/mysql/conf/conf.d/my.cnf
@@ -0,0 +1,5 @@
+[mysqld]
+log-bin=/var/lib/mysql/mysql-bin
+server-id=123654
+expire_logs_days = 30
+default-time-zone = 'Asia/Shanghai'
diff --git a/e2e_test/docker-compose-env/mysql/conf/conf.d/my.conf b/e2e_test/docker-compose-env/mysql/conf/conf.d/my.conf
new file mode 100644
index 0000000000..f95cd9f7d1
--- /dev/null
+++ b/e2e_test/docker-compose-env/mysql/conf/conf.d/my.conf
@@ -0,0 +1,11 @@
+[client]
+default-character-set=utf8
+[mysql]
+default-character-set=utf8
+[mysqld]
+init_connect='SET collation_connection = utf8_unicode_ci'
+init_connect='SET NAMES utf8'
+character-set-server=utf8
+collation-server=utf8_unicode_ci
+skip-character-set-client-handshake
+skip-name-resolve
diff --git a/e2e_test/docker-compose-env/mysql/docker-compose.yml b/e2e_test/docker-compose-env/mysql/docker-compose.yml
new file mode 100644
index 0000000000..7910cdb48e
--- /dev/null
+++ b/e2e_test/docker-compose-env/mysql/docker-compose.yml
@@ -0,0 +1,16 @@
+version: "3"
+networks:
+ dinky_net:
+ external: true
+services:
+ mysql:
+ restart: always
+ image: mysql:5.7
+ volumes:
+ - ./conf:/etc/mysql
+ - ./conf/mysql.conf.d/:/etc/mysql/mysql.conf.d
+ environment:
+ - MYSQL_ROOT_PASSWORD=dinky
+ - MYSQL_DATABASE=dinky
+ networks:
+ - dinky_net
diff --git a/e2e_test/tools/docker-compose.yml b/e2e_test/tools/docker-compose.yml
new file mode 100644
index 0000000000..7eb73f8cd2
--- /dev/null
+++ b/e2e_test/tools/docker-compose.yml
@@ -0,0 +1,13 @@
+version: "3"
+networks:
+ dinky_net:
+ external: true
+services:
+ python-script:
+ image: python:3.8
+ volumes:
+ - ./:/app
+ working_dir: /app
+ command: 'pip install -r requirements.txt && python main.py dinky14:8888'
+ networks:
+ - dinky_net
diff --git a/e2e_test/tools/env.py b/e2e_test/tools/env.py
new file mode 100644
index 0000000000..3cade9469c
--- /dev/null
+++ b/e2e_test/tools/env.py
@@ -0,0 +1,26 @@
+from requests import Session
+import urllib.parse as urlparse
+
+from login import url, assertRespOk
+
+
+def addCluster(session: Session) -> int:
+ """
+ en: Add a cluster instance
+ zh: 添加一个集群实例
+ :param session: requests.Session
+ :return: clusterId
+ """
+ name = 'flink-standalone'
+ add_cluster_resp = session.put(url("api/cluster"), json={
+ "name": name,
+ "type": "standalone",
+ "hosts": "jobmanager:8282"
+ })
+ assertRespOk(add_cluster_resp, "Add cluster")
+ get_data_list = session.get(url(f"api/cluster/list?searchKeyWord={urlparse.quote(name)}&isAutoCreate=false"))
+ assertRespOk(get_data_list, "Get cluster list")
+ for data in get_data_list.json()['data']:
+ if data['name'] == name:
+ return data['id']
+ raise Exception(f"Cluster {name} not found")
diff --git a/e2e_test/tools/logger.py b/e2e_test/tools/logger.py
new file mode 100644
index 0000000000..df168899ad
--- /dev/null
+++ b/e2e_test/tools/logger.py
@@ -0,0 +1,36 @@
+import logging
+
+
+# 日志配置信息 定义颜色 配置控制台日志处理器
+class ColorFormatter(logging.Formatter):
+ """扩展 logging.Formatter 以添加颜色"""
+ grey = "\x1b[;11m"
+ green = "\x1b[32;11m" # 绿色
+ yellow = "\x1b[33;11m"
+ red = "\x1b[31;11m"
+ bold_red = "\x1b[41;1m"
+ reset = "\x1b[0m" # 重置色
+ format = "%(levelname)s: %(message)s (%(asctime)s; %(filename)s:%(lineno)d)"
+
+ FORMATS = {
+ logging.DEBUG: grey + format + reset,
+ logging.INFO: green + format + reset,
+ logging.WARNING: yellow + format + reset,
+ logging.ERROR: red + format + reset,
+ logging.CRITICAL: bold_red + format + reset
+ }
+
+ def format(self, record):
+ log_fmt = self.FORMATS.get(record.levelno)
+ formatter = logging.Formatter(log_fmt)
+ return formatter.format(record)
+
+
+# 创建一个流处理器(控制台输出)
+ch = logging.StreamHandler()
+ch.setLevel(logging.DEBUG) # 控制台输出 级别以上日志
+ch.setFormatter(ColorFormatter())
+
+LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
+logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, handlers=[ch])
+log = logging.getLogger(__name__)
diff --git a/e2e_test/tools/login.py b/e2e_test/tools/login.py
new file mode 100644
index 0000000000..041daff95c
--- /dev/null
+++ b/e2e_test/tools/login.py
@@ -0,0 +1,49 @@
+import sys
+from json import JSONDecodeError
+
+import requests
+from requests import Response
+from logger import log
+
+# # 创建一个logger对象
+# log = logging.getLogger(__name__)
+# # 创建一个控制台处理器
+# console_handler = logging.StreamHandler()
+# # 将控制台处理器添加到logger对象中
+# log.addHandler(console_handler)
+#
+# # 设置控制台处理器的输出格式
+# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+# console_handler.setFormatter(formatter)
+dinky_addr = sys.argv[1]
+
+log.info(f"The address of the current request:{dinky_addr}")
+
+
+def url(path: str):
+ return rf"http://{dinky_addr}/{path}"
+
+
+def assertRespOk(resp: Response, api_name: str):
+ if resp.status_code != 200:
+ raise AssertionError("api name:{api_name} request failed")
+ else:
+ try:
+ resp_json = resp.json()
+ if not resp_json["success"]:
+ raise AssertionError(f"api name:{api_name} request failed.Error: {resp_json['msg']}")
+ except JSONDecodeError as e:
+ raise AssertionError(f"api name:{api_name} request failed.Error: {resp.content.decode()}")
+
+
+def login(session: requests.Session):
+ log.info("Login to Dinky, Currently in use: admin")
+ login_resp: Response = session.post(url("api/login"),
+ json={"username": "admin", "password": "dinky123!@#", "ldapLogin": False,
+ "autoLogin": True})
+ assertRespOk(login_resp, "Login")
+
+ log.info("Select the default tenant")
+ choose_tenant_resp = session.post(url("api/chooseTenant?tenantId=1"))
+ assertRespOk(choose_tenant_resp, "Choose Tenant")
+ session.cookies.set("tenantId", '1')
diff --git a/e2e_test/tools/main.py b/e2e_test/tools/main.py
new file mode 100644
index 0000000000..50a8ad1c37
--- /dev/null
+++ b/e2e_test/tools/main.py
@@ -0,0 +1,14 @@
+import requests
+
+from env import addCluster
+from login import login
+from task import addCatalogue, runFlinkLocalTask, runFlinkSessionTask
+
+if __name__ == '__main__':
+ session = requests.session()
+ login(session)
+ clusterId = addCluster(session)
+ catalogue = addCatalogue(session, "flink-sql-task")
+ sql = "DROP TABLE IF EXISTS source_table3;\r\nCREATE TABLE IF NOT EXISTS source_table3(\r\n--订单id\r\n`order_id` BIGINT,\r\n--产品\r\n\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n\r\n--支付时间\r\n`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`\r\n--WATERMARK\r\nWATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND\r\n) WITH(\r\n'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_id.min' = '1',\r\n 'fields.order_id.max' = '2',\r\n 'fields.amount.min' = '1',\r\n 'fields.amount.max' = '10',\r\n 'fields.product.min' = '1',\r\n 'fields.product.max' = '2'\r\n);\r\n\r\n-- SELECT * FROM source_table3 LIMIT 10;\r\n\r\nDROP TABLE IF EXISTS sink_table5;\r\nCREATE TABLE IF NOT EXISTS sink_table5(\r\n--产品\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n--支付时间\r\n`order_time` TIMESTAMP(3),\r\n--1分钟时间聚合总数\r\n`one_minute_sum` BIGINT\r\n) WITH(\r\n'connector'='print'\r\n);\r\n\r\nINSERT INTO sink_table5\r\nSELECT\r\nproduct,\r\namount,\r\norder_time,\r\nSUM(amount) OVER(\r\nPARTITION BY product\r\nORDER BY order_time\r\n-- 标识统计范围是1个 product 的最近 1 分钟的数据\r\nRANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW\r\n) as one_minute_sum\r\nFROM source_table3;"
+ runFlinkLocalTask(session, catalogue.id, "flink-sql-datagen-test",sql )
+ runFlinkSessionTask(session, catalogue.id, clusterId, "flink-sql-datagen-test-session", sql)
diff --git a/e2e_test/tools/requirements.txt b/e2e_test/tools/requirements.txt
new file mode 100644
index 0000000000..fb1f1e2c34
Binary files /dev/null and b/e2e_test/tools/requirements.txt differ
diff --git a/e2e_test/tools/task.py b/e2e_test/tools/task.py
new file mode 100644
index 0000000000..a7c781ba03
--- /dev/null
+++ b/e2e_test/tools/task.py
@@ -0,0 +1,152 @@
+from time import sleep
+
+import requests
+
+from login import assertRespOk, url
+from logger import log
+
+
+class CatalogueTree:
+ def __init__(self, id: int, name: str, taskId: int, children):
+ self.id = id
+ self.name = name
+ self.taskId = taskId
+ self.children: list[CatalogueTree] = children
+
+
+def assertFlinkTaskIsRunning(status: str, name: str):
+ # todo 这里应该判断flink是否有抛出异常,而不是只有状态
+ if status != "RUNNING":
+ raise Exception(f"Flink name:{name} is not RUNNING,current status:{status}")
+
+
+def getTask(data_list: list[dict], name: str) -> CatalogueTree:
+ for data in data_list:
+ if data['name'] == name:
+ return CatalogueTree(data['id'], data['name'], data['taskId'], data['children'])
+ if len(data["children"]) > 0:
+ result = getTask(data["children"], name)
+ if result is not None:
+ return result
+
+
+def addCatalogue(session: requests.Session, name: str, isLeaf: bool = False, parentId: int = 0):
+ add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogue"), json={
+ "name": name,
+ "isLeaf": isLeaf,
+ "parentId": parentId
+ })
+ assertRespOk(add_parent_dir_resp, "Create a dir")
+ get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={
+ "sortValue": "",
+ "sortType": ""
+ })
+ assertRespOk(get_all_tasks_resp, "Get job details")
+ data_list: list[dict] = get_all_tasks_resp.json()['data']
+ return getTask(data_list, name)
+
+
+def addTask(session: requests.Session, name: str, parent_id: int = 0, dialect: str = "FlinkSql",
+ statement: str = "", runtModel: str = "local", clusterId: int = -1) -> CatalogueTree:
+ """
+ en: Add a task
+ zh: 添加一个任务
+ :param session: requests.Session
+ :param name: task name
+ :param parent_id: dir id
+ :param type: task type
+ :param statement: statement
+ :return CatalogueTree
+ """
+ add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json={
+ "name": name,
+ "type": dialect,
+ "firstLevelOwner": 1,
+ "task": {
+ "savePointStrategy": 0,
+ "parallelism": 1,
+ "envId": -1,
+ "step": 1,
+ "alertGroupId": -1,
+ "type": runtModel,
+ "dialect": dialect,
+ "statement": statement,
+ "firstLevelOwner": 1,
+ "clusterId":clusterId
+ },
+ "isLeaf": False,
+ "parentId": parent_id
+ })
+ assertRespOk(add_parent_dir_resp, "Create a task")
+ get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={
+ "sortValue": "",
+ "sortType": ""
+ })
+ assertRespOk(get_all_tasks_resp, "Get job details")
+ data_list: list[dict] = get_all_tasks_resp.json()['data']
+ return getTask(data_list, name)
+
+
+def runTask(session: requests.Session, taskId: int) -> int:
+ """
+ en:Run a task
+ zh:运行一个任务
+ :param session: requests.Session
+ :param taskId: task id
+ :return:
+ """
+ run_task_resp = session.get(url(f"api/task/submitTask?id={taskId}"))
+ assertRespOk(run_task_resp, "Run Task")
+ return run_task_resp.json()['data']['jobInstanceId']
+
+
+def getFlinkTaskStatus(session: requests.Session, jobInstanceId: int) -> str:
+ """
+ en: Obtain the status of a Flink task
+ zh: 获取Flink 任务状态
+ :param session: requests.Session
+ :param jobInstanceId: job instance id
+ :return: status
+ """
+ run_task_resp = session.get(url(f"api/jobInstance/refreshJobInfoDetail?id={jobInstanceId}&isForce=false"))
+ assertRespOk(run_task_resp, "Get Task Status")
+ return run_task_resp.json()['data']['instance']['status']
+
+
+def runFlinkLocalTask(session: requests.Session, parentId: int, name: str, statement: str, waitTime: int = 10) -> None:
+ """
+ en: Run a FlinkLocal task
+ zh: 运行一个 FlinkLocal任务
+ :param session: requests.Session
+ :param parentId: dir id
+ :param name: task name
+ :param statement: statement
+ :param waitTime: zh:等待时间
+ """
+ log.info(
+ f"======================\nA Local Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================")
+ task = addTask(session, name, parentId, "FlinkSql", statement)
+ jobInstanceId = runTask(session, task.taskId)
+ sleep(waitTime)
+ status = getFlinkTaskStatus(session, jobInstanceId)
+ assertFlinkTaskIsRunning(status, name)
+
+
+def runFlinkSessionTask(session: requests.Session, parentId: int,clusterId:int, name: str, statement: str,
+ waitTime: int = 10) -> None:
+ """
+ en: Run a FlinkLocal task
+ zh: 运行一个 FlinkLocal任务
+ :param session: requests.Session
+ :param parentId: dir id
+ :param name: task name
+ :param statement: statement
+ :param waitTime: zh:等待时间
+ """
+ log.info(
+ f"======================\nA Session Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================")
+ task = addTask(session, name, parentId, "FlinkSql", statement,"standalone",clusterId)
+ jobInstanceId = runTask(session, task.taskId)
+ sleep(waitTime)
+ status = getFlinkTaskStatus(session, jobInstanceId)
+ assertFlinkTaskIsRunning(status, name)