Skip to content

Commit

Permalink
refactor(flink): optimize FlinkEngineExecutor code
Browse files Browse the repository at this point in the history
1. Remove redundant utils package and files
2. Improve error handling in after() method
3. Optimize code structure and method implementations
4. Update yarn application kill command
  • Loading branch information
GSHF committed Dec 30, 2024
1 parent 0e9c7be commit bf7f8fa
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,12 @@ public Float getFloat(String key) {
public Float getFloat(String key,String defaultValue) {
return Float.valueOf(configuration.getProperty(key,defaultValue));
}

public long getLong(String key) {
return Long.parseLong(configuration.getProperty(key));
}

public long getLong(String key, long defaultValue) {
return Long.parseLong(configuration.getProperty(key, String.valueOf(defaultValue)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;

/**
* mainly used to get the start command line of a process
*/
Expand Down Expand Up @@ -255,4 +259,43 @@ private static boolean needsEscaping(int verificationType, String arg) {
}
return false;
}

/**
* Print process output to logger
* @param process process
* @param logHandler log handler consumer
* @throws IOException io exception
*/
public static void printProcessOutput(Process process, Consumer<List<String>> logHandler) throws IOException {
if (process == null || logHandler == null) {
return;
}

try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {

List<String> lines = new ArrayList<>();
String line;

while ((line = inReader.readLine()) != null) {
lines.add(line);
if (lines.size() >= 100) { // Buffer size to avoid memory issues
logHandler.accept(new ArrayList<>(lines));
lines.clear();
}
}

while ((line = errReader.readLine()) != null) {
lines.add(line);
if (lines.size() >= 100) {
logHandler.accept(new ArrayList<>(lines));
lines.clear();
}
}

if (!lines.isEmpty()) {
logHandler.accept(lines);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class BaseCommandProcess {
/**
* process
*/
private Process process;
protected Process process;

/**
* log handler
Expand All @@ -60,7 +60,7 @@ public abstract class BaseCommandProcess {
*/
protected Logger logger;

private final Configurations configurations;
protected final Configurations configurations;

/**
* log list
Expand Down Expand Up @@ -163,7 +163,7 @@ private void buildProcess(String commandFile) throws IOException{
* @param process process
* @return processId
*/
private int getProcessId(Process process){
protected int getProcessId(Process process){
int processId = 0;

try {
Expand Down Expand Up @@ -266,7 +266,7 @@ private void hardKill(int processId) {
* get the standard output of the process
* @param process process
*/
private void parseProcessOutput(Process process) {
protected void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format(LoggerUtils.JOB_LOGGER_THREAD_NAME + "-%s", jobExecutionRequest.getJobExecutionName());
ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(new Runnable(){
Expand Down Expand Up @@ -357,4 +357,8 @@ protected List<String> commandOptions() {
*/
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;

protected Consumer<List<String>> getLogHandler() {
return logHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.datavines.common.entity.JobExecutionRequest;
import io.datavines.common.entity.ProcessResult;
import io.datavines.common.enums.ExecutionStatus;
import io.datavines.common.utils.OSUtils;
import io.datavines.common.utils.ProcessUtils;
import io.datavines.common.utils.YarnUtils;
import io.datavines.engine.executor.core.executor.BaseCommandProcess;
Expand Down Expand Up @@ -55,10 +56,20 @@ public void setTimeout(long timeout) {
private ProcessBuilder buildFlinkProcessBuilder(String command) throws IOException {
List<String> commandList = new ArrayList<>();

// 将命令拆分为参数列表,保留引号内的空格
// Get flink home from configurations or environment variable
String flinkHome = configurations.getString("flink.home", System.getenv("FLINK_HOME"));
if (StringUtils.isEmpty(flinkHome)) {
throw new IOException("FLINK_HOME is not set in either configurations or environment variables");
}

// Build the flink command path
String flinkCmd = Paths.get(flinkHome, "bin", OSUtils.isWindows() ? "flink.cmd" : "flink").toString();

// Split the command while preserving quoted spaces
String[] cmdArray = command.split("\\s+(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
commandList.add(flinkCmd);
for (String cmd : cmdArray) {
// 移除引号
// Remove quotes
cmd = cmd.replaceAll("^\"|\"$", "");
if (!cmd.trim().isEmpty()) {
commandList.add(cmd);
Expand All @@ -68,17 +79,15 @@ private ProcessBuilder buildFlinkProcessBuilder(String command) throws IOExcepti
ProcessBuilder processBuilder = new ProcessBuilder(commandList);
processBuilder.directory(new File(jobExecutionRequest.getExecuteFilePath()));

// 设置环境变量
// Set environment variables
Map<String, String> env = processBuilder.environment();
String flinkHome = configurations.getString("flink.home", System.getenv("FLINK_HOME"));
if (StringUtils.isNotEmpty(flinkHome)) {
env.put("FLINK_HOME", flinkHome);
// 添加到PATH
String path = env.get("PATH");
if (path != null) {
path = Paths.get(flinkHome, "bin") + File.pathSeparator + path;
env.put("PATH", path);
}
env.put("FLINK_HOME", flinkHome);

// Add to PATH
String path = env.get("PATH");
if (path != null) {
path = Paths.get(flinkHome, "bin") + File.pathSeparator + path;
env.put("PATH", path);
}

return processBuilder;
Expand Down
Loading

0 comments on commit bf7f8fa

Please sign in to comment.