diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java index ea803ca137..4908c43baf 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/CustomTableEnvironment.java @@ -92,7 +92,7 @@ default void addJar(File... jarPath) { List pathList = Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList()); List jars = configuration.get(PipelineOptions.JARS); - if (jars == null) { + if (CollUtil.isEmpty(jars)) { addConfiguration(PipelineOptions.JARS, pathList); } else { CollUtil.addAll(jars, pathList); diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java index d0d6931879..a44ad5e614 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java @@ -21,6 +21,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; +import org.dinky.executor.CustomTableEnvironment; + import org.apache.flink.api.common.Plan; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.PlanTranslator; @@ -80,6 +82,41 @@ public static JobClient executeAsync(Pipeline pipeline, StreamExecutionEnvironme } } + public static JobClient executeAsync(Pipeline pipeline, CustomTableEnvironment env) throws Exception { + Configuration configuration = new Configuration(env.getRootConfiguration()); + checkNotNull(pipeline, "pipeline cannot be null."); + checkNotNull( + configuration.get(DeploymentOptions.TARGET), + "No execution.target specified in your configuration file."); + + PipelineExecutorServiceLoader executorServiceLoader = (PipelineExecutorServiceLoader) + ReflectUtil.getFieldValue(env.getStreamExecutionEnvironment(), "executorServiceLoader"); + ClassLoader userClassloader = (ClassLoader) ReflectUtil.getFieldValue(env, "userClassloader"); + final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); + + checkNotNull( + executorFactory, + "Cannot find compatible factory for specified execution.target (=%s)", + configuration.get(DeploymentOptions.TARGET)); + + CompletableFuture jobClientFuture = + executorFactory.getExecutor(configuration).execute(pipeline, configuration, userClassloader); + + List jobListeners = env.getStreamExecutionEnvironment().getJobListeners(); + try { + JobClient jobClient = jobClientFuture.get(); + jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); + return jobClient; + } catch (ExecutionException executionException) { + final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException); + jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException)); + + throw new FlinkException( + String.format("Failed to execute job '%s'.", ReflectUtil.invoke(pipeline, "getJobName")), + strippedException); + } + } + public static String getStreamingPlanAsJSON(Pipeline pipeline) { if (pipeline instanceof StreamGraph) { return ((StreamGraph) pipeline).getStreamingPlanAsJSON(); diff --git a/dinky-common/src/main/java/org/dinky/data/job/JobStatementType.java b/dinky-common/src/main/java/org/dinky/data/job/JobStatementType.java index 1ca6c37e68..20e399f529 100644 --- a/dinky-common/src/main/java/org/dinky/data/job/JobStatementType.java +++ b/dinky-common/src/main/java/org/dinky/data/job/JobStatementType.java @@ -24,4 +24,5 @@ public enum JobStatementType { DDL, SQL, PIPELINE, + EXECUTE_JAR } diff --git a/dinky-common/src/main/java/org/dinky/data/job/SqlType.java b/dinky-common/src/main/java/org/dinky/data/job/SqlType.java index 203fbc96c0..7223d097b6 100644 --- a/dinky-common/src/main/java/org/dinky/data/job/SqlType.java +++ b/dinky-common/src/main/java/org/dinky/data/job/SqlType.java @@ -66,6 +66,8 @@ public enum SqlType { RESET("RESET", "^RESET.*", SqlCategory.DDL), + EXECUTE_JAR("EXECUTE_JAR", "^EXECUTE\\s+JAR\\s+WITH.*", SqlCategory.DML), + EXECUTE("EXECUTE", "^EXECUTE.*", SqlCategory.DML), ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+", SqlCategory.DDL), diff --git a/dinky-core/src/main/java/org/dinky/executor/Executor.java b/dinky-core/src/main/java/org/dinky/executor/Executor.java index 777918e434..b68bf736b3 100644 --- a/dinky-core/src/main/java/org/dinky/executor/Executor.java +++ b/dinky-core/src/main/java/org/dinky/executor/Executor.java @@ -213,6 +213,8 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements) SqlType operationType = Operations.getOperationType(statement); if (operationType.equals(SqlType.SET) || operationType.equals(SqlType.RESET)) { jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType); + } else if (operationType.equals(SqlType.EXECUTE_JAR)) { + jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE_JAR, operationType); } else if (operationType.equals(SqlType.EXECUTE)) { jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType); } else if (operationType.equals(SqlType.PRINT)) { diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index a703c63665..322aed3e87 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -52,7 +52,6 @@ import org.dinky.gateway.result.GatewayResult; import org.dinky.gateway.result.SavePointResult; import org.dinky.gateway.result.TestResult; -import org.dinky.job.runner.JobJarRunner; import org.dinky.trans.Operations; import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; @@ -252,12 +251,10 @@ public JobResult executeJarSql(String statement) throws Exception { jobStatementPlan.buildFinalStatement(); job = Job.build(runMode, config, executorConfig, executor, statement, useGateway); ready(); + JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this); try { - // Only one is executed. for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) { - JobJarRunner jobJarRunner = new JobJarRunner(this); - jobJarRunner.run(jobStatement); - break; + jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement); } if (job.isFailed()) { failed(); diff --git a/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java b/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java index 909d1b9fd9..bad0a8c456 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java +++ b/dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java @@ -21,6 +21,7 @@ import org.dinky.data.job.JobStatementType; import org.dinky.job.runner.JobDDLRunner; +import org.dinky.job.runner.JobJarRunner; import org.dinky.job.runner.JobPipelineRunner; import org.dinky.job.runner.JobSetRunner; import org.dinky.job.runner.JobSqlRunner; @@ -31,12 +32,14 @@ public class JobRunnerFactory { private JobSqlRunner jobSqlRunner; private JobPipelineRunner jobPipelineRunner; private JobDDLRunner jobDDLRunner; + private JobJarRunner jobJarRunner; public JobRunnerFactory(JobManager jobManager) { this.jobSetRunner = new JobSetRunner(jobManager); this.jobSqlRunner = new JobSqlRunner(jobManager); this.jobPipelineRunner = new JobPipelineRunner(jobManager); this.jobDDLRunner = new JobDDLRunner(jobManager); + this.jobJarRunner = new JobJarRunner(jobManager); } public JobRunner getJobRunner(JobStatementType jobStatementType) { @@ -47,6 +50,8 @@ public JobRunner getJobRunner(JobStatementType jobStatementType) { return jobSqlRunner; case PIPELINE: return jobPipelineRunner; + case EXECUTE_JAR: + return jobJarRunner; case DDL: default: return jobDDLRunner; diff --git a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java index f80259727a..5838d6dd19 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java +++ b/dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java @@ -108,7 +108,8 @@ private void checkEmptyStatement() { throw new DinkyException("The statement cannot be empty. Please check your statements."); } if (jobStatement.getStatementType().equals(JobStatementType.SQL) - || jobStatement.getStatementType().equals(JobStatementType.PIPELINE)) { + || jobStatement.getStatementType().equals(JobStatementType.PIPELINE) + || jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) { hasSqlStatement = true; } } diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index 130d83fde3..8df3fd519d 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -59,6 +59,7 @@ import java.net.URL; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -174,14 +175,12 @@ private Pipeline getPipeline(JobStatement jobStatement) { private void submitNormal(JobStatement jobStatement) throws Exception { JobClient jobClient = FlinkStreamEnvironmentUtil.executeAsync( - getPipeline(jobStatement), jobManager.getExecutor().getStreamExecutionEnvironment()); + getPipeline(jobStatement), jobManager.getExecutor().getCustomTableEnvironment()); if (Asserts.isNotNull(jobClient)) { jobManager.getJob().setJobId(jobClient.getJobID().toHexString()); - jobManager.getJob().setJids(new ArrayList() { - { - add(jobManager.getJob().getJobId()); - } - }); + jobManager + .getJob() + .setJids(Collections.singletonList(jobManager.getJob().getJobId())); jobManager.getJob().setStatus(Job.JobStatus.SUCCESS); } else { jobManager.getJob().setStatus(Job.JobStatus.FAILED); diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx index 3c6affbe07..f65520db86 100644 --- a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx @@ -184,6 +184,15 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { })); const [isRunning, setIsRunning] = useState(false); + useEffect(() => { + if (sqlForm.enable) { + setSqlForm((prevState) => ({ + ...prevState, + initSqlStatement: currentState.statement + })); + } + }, [sqlForm.enable, currentState.statement]); + useAsyncEffect(async () => { const taskDetail = await getTaskDetails(params.taskId); if (taskDetail) {