Skip to content

Commit

Permalink
feat: 支持调用接口解析py和scala的方法名
Browse files Browse the repository at this point in the history
  • Loading branch information
taoran1250 committed Dec 12, 2024
1 parent c3c7779 commit ffee469
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 20 deletions.
2 changes: 1 addition & 1 deletion linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
<version>${springfox.version}</version>
<exclusions>
<exclusion>
<artifactId>classgraph</artifactId>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ class HiveEngineConnExecutor(
if (numberOfMRJobs > 0) {
engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
val queueName = hiveConf.get(HiveEngineConfiguration.HIVE_QUEUE_NAME)
engineExecutorContext.appendStdout(s"Your task will be submitted to the $queueName queue")
engineExecutorContext.appendStdout(
s"Your task will be submitted to the $queueName queue"
)
}
if (thread.isInterrupted) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,8 @@ public List<Map<String, Object>> getTablesByDbNameAndOptionalUserName(
queryParam.withRoles(roles);
List<Map<String, Object>> hiveTables =
hiveMetaDao.getTablesByDbNameAndUserAndRolesFromDbPrvs(queryParam);
hiveTables.addAll(
hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam));
return hiveTables.stream()
.distinct()
.collect(Collectors.toList());
hiveTables.addAll(hiveMetaDao.getTablesByDbNameAndUserAndRolesFromTblPrvs(queryParam));
return hiveTables.stream().distinct().collect(Collectors.toList());
} else {
log.info("user {} to getTablesByDbName no permission control", queryParam.getUserName());
return hiveMetaDao.getTablesByDbName(queryParam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1437,4 +1437,28 @@ public Message pythonUpload(
.data("dependencies", dependencies)
.data("fileName", fileName);
}

@ApiImplicitParam(
name = "path",
dataType = "String",
value = "path",
example = "file:///mnt/bdap/hadoop/test1012_01.py")
@RequestMapping(path = "/get-register-functions", method = RequestMethod.GET)
public Message getRegisterFunctions(HttpServletRequest req, @RequestParam("path") String path) {
if (StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_PY)
|| StringUtils.endsWithIgnoreCase(path, Constants.FILE_EXTENSION_SCALA)) {
try {
FsPath fsPath = new FsPath(path);
// 获取文件系统实例
FileSystem fileSystem = (FileSystem) FSFactory.getFs(fsPath);
fileSystem.init(null);
return Message.ok()
.data("functions", UdfUtils.getRegisterFunctions(fileSystem, fsPath, path));
} catch (Exception e) {
return Message.error("解析文件失败,错误信息:" + e);
}
} else {
return Message.error("仅支持.py和.scala文件");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

public class Constants {
public static final String FILE_EXTENSION_PY = ".py";
public static final String FILE_EXTENSION_SCALA = ".scala";
public static final String FILE_EXTENSION_ZIP = ".zip";
public static final String FILE_EXTENSION_TAR_GZ = ".tar.gz";
public static final String FILE_PERMISSION = "770";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.common.utils.JsonUtils;
import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.storage.fs.FileSystem;
import org.apache.linkis.udf.conf.Constants;
import org.apache.linkis.udf.exception.UdfException;

Expand All @@ -43,6 +45,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,11 +103,11 @@ private static String getRootPath(InputStream inputStream, String folder) throws
while ((entry = tarInput.getNextTarEntry()) != null) {
if (entry.isDirectory() && entry.getName().endsWith(delimiter)) {
rootPathStr = entry.getName().replace(folder + FsPath.SEPARATOR, "");
return rootPathStr;
return rootPathStr;
}
if (entry.getName().contains(delimiter)) {
rootPathStr = entry.getName().substring(0, entry.getName().indexOf(delimiter));
return rootPathStr;
return rootPathStr;
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -158,16 +161,16 @@ public static InputStream getZipInputStreamByTarInputStream(
return createZipFile(file.getInputStream(), packageName, rootPath);
} else {
throw new UdfException(
80038,
"The name directory "
+ packageName
+ " specified by PKG-INFO does not exist. Please confirm that the "
+ packageName
+ " specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录"
+ packageName
+ "不存在,请确认包中PKG-INFO指定"
+ packageName
+ "和实际文件夹名称一致)");
80038,
"The name directory "
+ packageName
+ " specified by PKG-INFO does not exist. Please confirm that the "
+ packageName
+ " specified by PKG-INFO in the package matches the actual folder name (PKG-INFO指定Name目录"
+ packageName
+ "不存在,请确认包中PKG-INFO指定"
+ packageName
+ "和实际文件夹名称一致)");
}
}

Expand Down Expand Up @@ -280,4 +283,55 @@ public static List<String> extractDependencies(String content, String name) {
}
return modules;
}

public static List<String> getRegisterFunctions(FileSystem fileSystem, FsPath fsPath, String path)
throws Exception {
try (InputStream is = fileSystem.read(fsPath)) {
// 将inputstream内容转换为字符串
String content = IOUtils.toString(is, StandardCharsets.UTF_8);
if (StringUtils.endsWith(path, Constants.FILE_EXTENSION_PY)) {
// 解析python文件
return extractPythonMethodNames(path);
} else if (StringUtils.endsWith(path, Constants.FILE_EXTENSION_SCALA)) {
// 解析scala代码
return extractScalaMethodNames(content);
} else {
throw new UdfException(80041, "Unsupported file type: " + path);
}
} catch (IOException e) {
throw new UdfException(80042, "Failed to read file: " + path, e);
}
}

public static List<String> extractScalaMethodNames(String scalaCode) {
List<String> methodNames = new ArrayList<>();
// 正则表达式匹配方法定义,包括修饰符
String regex = "(\\b(private|protected)\\b\\s+)?\\bdef\\s+(\\w+)\\b";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(scalaCode);
logger.info("use regex to get scala method names.. reg:{}", regex);
while (matcher.find()) {
String methodName = matcher.group(3);
methodNames.add(methodName);
}

return methodNames;
}

public static List<String> extractPythonMethodNames(String udfPath) throws Exception {
String exec =
Utils.exec(
(new String[] {
Constants.PYTHON_COMMAND.getValue(),
Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py",
udfPath
}));
logger.info(
"execute python script to get python method name...{} {} {}",
Constants.PYTHON_COMMAND.getValue(),
Configuration.getLinkisHome() + "/admin/" + "linkis_udf_get_python_methods.py",
udfPath);
// 将exec转换为List<String>,exec为一个json数组
return JsonUtils.jackson().readValue(exec, new TypeReference<List<String>>() {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@
<version>${springfox.version}</version>
<exclusions>
<exclusion>
<artifactId>classgraph</artifactId>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down

0 comments on commit ffee469

Please sign in to comment.