Skip to content

Commit

Permalink
Add Flink execution engine support
Browse files Browse the repository at this point in the history
  • Loading branch information
GSHF committed Dec 24, 2024
1 parent 4a5e7e7 commit 77ab170
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 7,181 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@
<dependencies>
<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-flink-api</artifactId>
<artifactId>datavines-engine-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-executor</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand All @@ -42,12 +47,12 @@
</dependency>
<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-common</artifactId>
<artifactId>datavines-engine-flink-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-api</artifactId>
<artifactId>datavines-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@
import io.datavines.common.entity.ProcessResult;
import io.datavines.common.enums.ExecutionStatus;
import io.datavines.common.utils.LoggerUtils;
import io.datavines.engine.core.executor.AbstractYarnEngineExecutor;
import io.datavines.engine.core.utils.ShellCommandProcess;
import io.datavines.engine.executor.core.base.AbstractYarnEngineExecutor;
import io.datavines.engine.executor.core.executor.ShellCommandProcess;
import io.datavines.engine.flink.executor.utils.FlinkArgsUtils;
import io.datavines.engine.flink.executor.utils.FlinkParameters;

public class FlinkEngineExecutor extends AbstractYarnEngineExecutor {

private static final String FLINK_COMMAND = "${FLINK_HOME}/bin/flink";
private Configurations configurations;
private ProcessResult processResult;
private boolean isCancel;

@Override
public void init(JobExecutionRequest jobExecutionRequest, Logger logger, Configurations configurations) throws Exception {
Expand All @@ -42,23 +40,23 @@ public void init(JobExecutionRequest jobExecutionRequest, Logger logger, Configu

this.jobExecutionRequest = jobExecutionRequest;
this.logger = logger;
this.shellCommandProcess = new ShellCommandProcess(this::logHandle,
logger, jobExecutionRequest, configurations);
this.configurations = configurations;
this.processResult = new ProcessResult();
this.isCancel = false;
this.shellCommandProcess = new ShellCommandProcess(
this::logHandle,
logger,
jobExecutionRequest,
configurations
);
}

@Override
public void execute() throws Exception {
try {
String command = buildCommand();
logger.info("flink task command: {}", command);
int exitCode = shellCommandProcess.run(command);
processResult.setExitStatusCode(exitCode);
if (exitCode == 0) {
processResult.setExitStatusCode(ExecutionStatus.SUCCESS.getCode());
}
shellCommandProcess.run(command);
processResult.setExitStatusCode(ExecutionStatus.SUCCESS.getCode());
} catch (Exception e) {
logger.error("flink task error", e);
processResult.setExitStatusCode(ExecutionStatus.FAILURE.getCode());
Expand All @@ -67,14 +65,8 @@ public void execute() throws Exception {
}

@Override
public void cancel() throws Exception {
this.isCancel = true;
shellCommandProcess.cancel();
}

@Override
public boolean isCancel() throws Exception {
return this.isCancel;
public void after() throws Exception {
// 执行后的清理工作
}

@Override
Expand All @@ -88,11 +80,7 @@ public JobExecutionRequest getTaskRequest() {
}

@Override
public void after() throws Exception {
// 执行后的清理工作
}

private String buildCommand() {
protected String buildCommand() {
FlinkParameters parameters = new FlinkParameters();
// Set parameters from configurations
parameters.setMainJar(configurations.getString("mainJar"));
Expand All @@ -101,8 +89,8 @@ private String buildCommand() {
parameters.setMainArgs(configurations.getString("mainArgs"));
parameters.setParallelism(configurations.getInt("parallelism", 1));
parameters.setJobName(configurations.getString("jobName"));
parameters.setYarnQueue(configurations.getString("queue"));
parameters.setYarnQueue(configurations.getString("yarnQueue"));

return FLINK_COMMAND + " " + String.join(" ", FlinkArgsUtils.buildArgs(parameters));
}
}
40 changes: 40 additions & 0 deletions datavines-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,22 @@
<artifactId>datavines-engine-local-executor</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
Expand All @@ -336,6 +352,30 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-flink-executor</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-local-config</artifactId>
Expand Down
Loading

0 comments on commit 77ab170

Please sign in to comment.