From c26b11b9b3dc812a7a18d63dca331440912f8658 Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Fri, 11 Aug 2023 11:17:11 +0800 Subject: [PATCH 1/7] support AppConn2Linkis node type. --- .../AppConn2LinkisRefExecutionOperation.java | 19 +++ .../ref/AppConn2LinkisResponseRef.java | 116 ++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/operation/AppConn2LinkisRefExecutionOperation.java create mode 100644 dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/ref/AppConn2LinkisResponseRef.java diff --git a/dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/operation/AppConn2LinkisRefExecutionOperation.java b/dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/operation/AppConn2LinkisRefExecutionOperation.java new file mode 100644 index 000000000..1271579f7 --- /dev/null +++ b/dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/operation/AppConn2LinkisRefExecutionOperation.java @@ -0,0 +1,19 @@ +package com.webank.wedatasphere.dss.standard.app.development.operation; + +import com.webank.wedatasphere.dss.standard.app.development.ref.AppConn2LinkisResponseRef; +import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentRequestRef; +import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException; + +public interface AppConn2LinkisRefExecutionOperation> extends RefExecutionOperation { + + /** + * 该 Operation 用于通过第三方 refJob 的 jobContent 信息,生成一段可被 Linkis 某个引擎执行的代码。
+ * 是为 AppConn2Linkis 工作流节点类型设计的一个执行类。具体可参照:AppConn2Linkis工作流节点类型开发指南。 + * @param requestRef 包含了第三方 refJob 信息的 requestRef + * @return 包含了可被 Linkis 某个引擎执行的代码信息。 + * @throws ExternalOperationFailedException 如果生成代码失败,则抛出该异常 + */ + @Override + AppConn2LinkisResponseRef execute(K requestRef) throws ExternalOperationFailedException; + +} diff --git a/dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/ref/AppConn2LinkisResponseRef.java b/dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/ref/AppConn2LinkisResponseRef.java new file mode 100644 index 000000000..a068d21ed --- /dev/null +++ b/dss-standard/development-standard/development-process-standard/src/main/java/com/webank/wedatasphere/dss/standard/app/development/ref/AppConn2LinkisResponseRef.java @@ -0,0 +1,116 @@ +package com.webank.wedatasphere.dss.standard.app.development.ref; + +import com.webank.wedatasphere.dss.standard.common.entity.ref.ResponseRef; +import com.webank.wedatasphere.dss.standard.common.entity.ref.ResponseRefBuilder; +import com.webank.wedatasphere.dss.standard.common.entity.ref.ResponseRefImpl; + +import java.util.Map; + +public interface AppConn2LinkisResponseRef extends ResponseRef { + + /** + * 返回可被 Linkis 某个引擎执行的代码 + * @return 可被 Linkis 某个引擎执行的代码 + */ + String getCode(); + + /** + * Linkis Job 的 params,符合 Linkis 的 params 规范,如下:
+ * { + * "configuration": { + * "special": { + * "k1": "v1" + * }, + * "runtime": { + * "k2": "v2" + * }, + * "startup": { + * "k3": "v2" + * } + * }, + * "variable": { + * "runDate": "2023-08-10" + * } + * } + * @return Linkis Job 的 params + */ + Map getParams(); + + /** + * 希望将 {@code getCode()} 返回的代码提交给哪个 Linkis 引擎 + * @return Linkis 引擎类型 + */ + String getEngineType(); + + /** + * 希望将 {@code getCode()} 返回的代码提交给哪个 Linkis 引擎的哪个代码类型 + * @return 代码类型 + */ + String getRunType(); + + static AppConn2LinkisResponseRefBuilder newBuilder() { + return new AppConn2LinkisResponseRefBuilder(); + } + + class AppConn2LinkisResponseRefBuilder + extends ResponseRefBuilder.ExternalResponseRefBuilder { + + private String code; + private Map params; + private String engineType; + private String runType; + + public AppConn2LinkisResponseRefBuilder setCode(String code) { + this.code = code; + return this; + } + + public AppConn2LinkisResponseRefBuilder setParams(Map params) { + this.params = params; + return this; + } + + public AppConn2LinkisResponseRefBuilder setEngineType(String engineType) { + this.engineType = engineType; + return this; + } + + public AppConn2LinkisResponseRefBuilder setRunType(String runType) { + this.runType = runType; + return this; + } + + class AppConn2LinkisResponseRefImpl extends ResponseRefImpl implements AppConn2LinkisResponseRef{ + public AppConn2LinkisResponseRefImpl() { + super(AppConn2LinkisResponseRefBuilder.this.responseBody, AppConn2LinkisResponseRefBuilder.this.status, + AppConn2LinkisResponseRefBuilder.this.errorMsg, AppConn2LinkisResponseRefBuilder.this.responseMap); + } + + @Override + public String getCode() { + return code; + } + + @Override + public Map getParams() { + return params; + } + + @Override + public String getEngineType() { + return engineType; + } + + @Override + public String getRunType() { + return runType; + } + } + + @Override + protected AppConn2LinkisResponseRef createResponseRef() { + return new AppConn2LinkisResponseRefImpl(); + } + } + +} From e2ddaf1f965617949479fabe477149abab461576 Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Fri, 11 Aug 2023 11:18:42 +0800 Subject: [PATCH 2/7] support to execute bml2linkis node type. --- .../execution/parser/BML2LinkisJobParser.java | 55 +++++++ .../node/execution/parser/CodeParser.java | 147 ++---------------- 2 files changed, 67 insertions(+), 135 deletions(-) create mode 100644 dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/BML2LinkisJobParser.java diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/BML2LinkisJobParser.java b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/BML2LinkisJobParser.java new file mode 100644 index 000000000..49752adbc --- /dev/null +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/BML2LinkisJobParser.java @@ -0,0 +1,55 @@ +package com.webank.wedatasphere.dss.linkis.node.execution.parser; + +import com.google.gson.reflect.TypeToken; +import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource; +import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException; +import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob; +import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; +import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; + +import java.util.Map; + +public class BML2LinkisJobParser extends CodeParser { + + @Override + public void parseJob(Job job) throws Exception { + if(!(job instanceof AppConnLinkisJob)) { + return; + } + AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job; + String runType = appConnLinkisJob.getRunType(); + // 只处理包含 bml2linkis 的 AppConnLinkisJob,例如:linkis.appconn..bml2linkis + if(!runType.toLowerCase().contains("bml2linkis")) { + return; + } + job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from code %s.", runType, + job.getCode())); + Map script = LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken>() {}.getType()); + if(!script.containsKey("resourceId") || !script.containsKey("version") || !script.containsKey("fileName")) { + job.getLogObj().error("the code error, resourceId, version or fileName is not exists."); + throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent."); + } + BMLResource bmlResource = new BMLResource(); + bmlResource.setResourceId((String) script.get("resourceId")); + bmlResource.setVersion((String) script.get("version")); + // fileName 的格式为 ${resourceId}.${engineType}.${runType} + bmlResource.setFileName((String) script.get("fileName")); + getAndSetCode(bmlResource, appConnLinkisJob); + String[] fileNameArray = bmlResource.getFileName().split("\\."); + if(fileNameArray.length < 3) { + job.getLogObj().error(String.format("cannot recognize fileName %s, the fileName format must be ${resourceId}.${engineType}.${runType}", bmlResource.getFileName())); + throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent."); + } + String realEngineType = fileNameArray[fileNameArray.length - 2]; + String realRunType = fileNameArray[fileNameArray.length - 1]; + setEngineType(job, realEngineType, realRunType); + } + + protected void setEngineType(Job job, String realEngineType, String realRunType) { + job.getLogObj().warn(String.format("switch job from engineType %s with runType %s to engineType %s with runType %s", + job.getEngineType(), job.getRunType(), realEngineType, realRunType)); + job.setEngineType(realEngineType); + job.setRunType(realRunType); + } + +} diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/CodeParser.java b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/CodeParser.java index 949a9537c..a6c2bb68f 100644 --- a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/CodeParser.java +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/CodeParser.java @@ -22,20 +22,17 @@ import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException; import com.webank.wedatasphere.dss.linkis.node.execution.job.CommonLinkisJob; import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; +import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob; import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService; import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; import org.apache.linkis.filesystem.WorkspaceClientFactory; import org.apache.linkis.filesystem.request.WorkspaceClient; import org.apache.linkis.filesystem.response.ScriptFromBMLResponse; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; public class CodeParser implements JobParser { @@ -44,9 +41,10 @@ public class CodeParser implements JobParser { private volatile WorkspaceClient client1X = null; private volatile WorkspaceClient client0X = null; private final Object clientLocker = new Object(); + @Override public void parseJob(Job job) throws Exception{ - if (! ( job instanceof CommonLinkisJob) ) { + if (! (job instanceof CommonLinkisJob) ) { return ; } CommonLinkisJob linkisAppConnJob = (CommonLinkisJob) job; @@ -66,8 +64,11 @@ public void parseJob(Job job) throws Exception{ if(null == scriptResource) { throw new LinkisJobExecutionErrorException(90102,"Failed to get script resource"); } + getAndSetCode(scriptResource, linkisAppConnJob); + } - Map executionParams = getExecutionParams(scriptResource, linkisAppConnJob); + protected void getAndSetCode(BMLResource bmlResource, LinkisJob linkisAppConnJob) { + Map executionParams = getExecutionParams(bmlResource, linkisAppConnJob); if (executionParams.get("executionCode") != null) { String executionCode = (String) executionParams.get("executionCode"); linkisAppConnJob.getLogObj().info("************************************SUBMIT CODE************************************"); @@ -81,9 +82,12 @@ public void parseJob(Job job) throws Exception{ linkisAppConnJob.getParams().putAll( (Map)executionParams.get("params")); } } + dealExecutionParams(linkisAppConnJob, executionParams); } - private Map getExecutionParams(BMLResource bmlResource, CommonLinkisJob linkisAppConnJob) { + protected void dealExecutionParams(LinkisJob linkisAppConnJob, Map executionParams) {} + + protected Map getExecutionParams(BMLResource bmlResource, LinkisJob linkisAppConnJob) { Map map = new HashMap<>(); ScriptFromBMLResponse response = getOrCreateWorkSpaceClient(linkisAppConnJob).requestOpenScriptFromBML(bmlResource.getResourceId(), bmlResource.getVersion(), bmlResource.getFileName()); linkisAppConnJob.getLogObj().info("Get execution code from workspace client,bml resource id "+bmlResource.getResourceId()+", version is "+bmlResource.getVersion()); @@ -92,7 +96,7 @@ private Map getExecutionParams(BMLResource bmlResource, CommonL return map; } - private WorkspaceClient getOrCreateWorkSpaceClient(CommonLinkisJob linkisAppConnJob) { + private WorkspaceClient getOrCreateWorkSpaceClient(LinkisJob linkisAppConnJob) { Map props = linkisAppConnJob.getJobProps(); if(LinkisJobExecutionConfiguration.isLinkis1_X(props)) { if (null == client1X) { @@ -119,131 +123,4 @@ private WorkspaceClient getOrCreateWorkSpaceClient(CommonLinkisJob linkisAppConn } } - private ArrayList getResourceNames(String code){ - ArrayList bmlResourceNames = new ArrayList(); - Matcher mb = pb.matcher(code); - while (mb.find()) { - bmlResourceNames.add(mb.group().trim()); - } - return bmlResourceNames; - } - - - /** - * 1.Find the project file used in the script - * 2.Find the node file used in the script - * 3.Recursively find the flow file used in the script - * 4.Replace file name with prefixed name - * @param resourceNames - * @param linkisAppConnJob - * @return - */ - private ArrayList getResourcesByNames(ArrayList resourceNames, CommonLinkisJob linkisAppConnJob) { - - ArrayList bmlResourceArrayList = new ArrayList<>(); - - String jobName = linkisAppConnJob.getJobName(); - String flowName = linkisAppConnJob.getSource().get("flowName"); - String projectName = linkisAppConnJob.getSource().get("projectName"); - - - List projectResourceList = linkisAppConnJob.getProjectResourceList(); - - - List jobResourceList = linkisAppConnJob.getJobResourceList(); - for (String resourceName : resourceNames) { - String[] resourceNameSplit = resourceName.split("://"); - String prefix = resourceNameSplit[0].toLowerCase(); - String fileName = resourceNameSplit[1]; - BMLResource resource = null; - String afterFileName = fileName; - switch (prefix) { - case "project": - resource = findResource(projectResourceList, fileName); - afterFileName = LinkisJobExecutionConfiguration.PROJECT_PREFIX + "_" + projectName + "_" + fileName; - break; - case "flow": - resource = findFlowResource(linkisAppConnJob, fileName, flowName); - break; - case "node": - resource = findResource(jobResourceList, fileName); - afterFileName = LinkisJobExecutionConfiguration.JOB_PREFIX + "_" + jobName + "_" + fileName; - break; - default: - } - if (null == resource) { - linkisAppConnJob.getLogObj().error("Failed to find the " + prefix + " resource file of " + fileName); - throw new RuntimeException("Failed to find the " + prefix + " resource file of " + fileName); - } - if (!afterFileName.equals(fileName)) { - resource.setFileName(afterFileName); - } - bmlResourceArrayList.add(resource); - } - return bmlResourceArrayList; - } - - - /** - * Recursively find the flow file used in the script - * Recursive exit condition is top-level flow - * - */ - private BMLResource findFlowResource(CommonLinkisJob linkisAppConnJob, String fileName, String flowName) { - - String fullFlowName = ""; - Map> fLowNameAndResources = linkisAppConnJob.getFlowNameAndResources(); - if (fLowNameAndResources == null){ - return null; - } - Optional>> first = fLowNameAndResources.entrySet().stream().filter(fLowNameAndResource -> fLowNameAndResource.getKey().endsWith(flowName + LinkisJobExecutionConfiguration.RESOURCES_NAME)).findFirst(); - - if(first.isPresent()){ - fullFlowName = first.get().getKey(); - BMLResource resource = findResource(first.get().getValue(), fileName); - if (resource != null) { - resource.setFileName(flowName + "_" + fileName); - return resource; - } - } - - String firstFlow = "flow." + flowName + LinkisJobExecutionConfiguration.RESOURCES_NAME; - if (firstFlow.equals(fullFlowName)) { - return null; - } - //getParentFlowName:flow.flows1.test.resources return:flows1 - String parentFlowName = StringUtils.substringAfterLast(StringUtils.substringBefore(fullFlowName, "." + flowName - + LinkisJobExecutionConfiguration.RESOURCES_NAME), "."); - if (StringUtils.isEmpty(parentFlowName)) { - return null; - } - - return findFlowResource(linkisAppConnJob, fileName, parentFlowName); - } - - - private String replaceCodeResourceNames(String code, ArrayList resourceNameList, ArrayList resourceList){ - if(resourceList.size() != resourceNameList.size()){ - throw new RuntimeException("Failed to parsed resource file"); - } - - String[] names = resourceNameList.toArray(new String[]{}); - - String[] afterNames = new String[resourceList.size()]; - for (int i=0 ; i < afterNames.length ; i++){ - afterNames[i] = resourceList.get(i).getFileName(); - } - return StringUtils.replaceEach(code, names, afterNames); - } - - private BMLResource findResource(List resourceArrayList, String fileName){ - if(resourceArrayList != null && !resourceArrayList.isEmpty()) { - for(BMLResource resource : resourceArrayList){ - if(resource.getFileName().equals(fileName)){ - return resource; - } - } - } - return null; - } } From 35f4505145e810b400a78087ac5de7a9e3aa2677 Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Fri, 11 Aug 2023 11:19:22 +0800 Subject: [PATCH 3/7] support to execute appconn2linkis node type. --- .../AppConn2LinkisRefExecutionRestfulApi.java | 68 ++++++++++++++ .../entity/AppConn2LinkisPostAction.java | 35 +++++++ .../parser/AppConn2LinkisJobParser.java | 92 +++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 dss-framework/dss-appconn-framework/src/main/java/com/webank/wedatasphere/dss/framework/appconn/restful/AppConn2LinkisRefExecutionRestfulApi.java create mode 100644 dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/entity/AppConn2LinkisPostAction.java create mode 100644 dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/AppConn2LinkisJobParser.java diff --git a/dss-framework/dss-appconn-framework/src/main/java/com/webank/wedatasphere/dss/framework/appconn/restful/AppConn2LinkisRefExecutionRestfulApi.java b/dss-framework/dss-appconn-framework/src/main/java/com/webank/wedatasphere/dss/framework/appconn/restful/AppConn2LinkisRefExecutionRestfulApi.java new file mode 100644 index 000000000..e43c2440d --- /dev/null +++ b/dss-framework/dss-appconn-framework/src/main/java/com/webank/wedatasphere/dss/framework/appconn/restful/AppConn2LinkisRefExecutionRestfulApi.java @@ -0,0 +1,68 @@ +package com.webank.wedatasphere.dss.framework.appconn.restful; + +import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn; +import com.webank.wedatasphere.dss.appconn.manager.AppConnManager; +import com.webank.wedatasphere.dss.common.label.DSSLabel; +import com.webank.wedatasphere.dss.common.label.EnvDSSLabel; +import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils; +import com.webank.wedatasphere.dss.standard.app.development.operation.AppConn2LinkisRefExecutionOperation; +import com.webank.wedatasphere.dss.standard.app.development.ref.AppConn2LinkisResponseRef; +import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentRequestRef; +import com.webank.wedatasphere.dss.standard.app.development.service.RefExecutionService; +import com.webank.wedatasphere.dss.standard.app.development.utils.DevelopmentOperationUtils; +import com.webank.wedatasphere.dss.standard.app.sso.Workspace; +import com.webank.wedatasphere.dss.standard.common.desc.AppInstance; +import org.apache.linkis.server.Message; +import org.apache.linkis.server.security.SecurityFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import javax.servlet.http.HttpServletRequest; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@RequestMapping(path = "/dss/framework/appconn", produces = {"application/json"}) +@RestController +public class AppConn2LinkisRefExecutionRestfulApi { + + private static final Logger LOGGER = LoggerFactory.getLogger(AppConn2LinkisRefExecutionRestfulApi.class); + + + @Autowired + private AppConnManagerRestfulApi appConnManagerRestfulApi; + + @RequestMapping(path = "execute", method = RequestMethod.POST) + public Message execute(HttpServletRequest request, @RequestBody Map json) { + String userName = SecurityFilter.getLoginUsername(request); + LOGGER.info("user {} try to transform jobContent to Linkis job with requestBody {}.", userName, json); + String workspaceStr = (String) json.get("workspaceStr"); + String appConnName = (String) json.get("appConnName"); + String labelStr = (String) json.get("labels"); + Map refJobContent = (Map) json.get("jobContent"); + Workspace workspace = DSSCommonUtils.COMMON_GSON.fromJson(workspaceStr, Workspace.class); + OnlyDevelopmentAppConn appConn = (OnlyDevelopmentAppConn) AppConnManager.getAppConnManager().getAppConn(appConnName); + AppInstance appInstance; + List labels = Arrays.asList(new EnvDSSLabel(labelStr)); + if(appConn.getAppDesc().getAppInstances().size() == 1) { + appInstance = appConn.getAppDesc().getAppInstances().get(0); + } else { + appInstance = appConn.getAppDesc().getAppInstancesByLabels(labels).get(0); + } + AppConn2LinkisResponseRef responseRef = DevelopmentOperationUtils.tryDevelopmentOperation(() -> appConn.getOrCreateDevelopmentStandard().getRefExecutionService(appInstance), + developmentService -> ((RefExecutionService) developmentService).getRefExecutionOperation(), + null, refJobContentRequestRef -> refJobContentRequestRef.setRefJobContent(refJobContent), + null, null, (developmentOperation, developmentRequestRef) -> { + developmentRequestRef.setWorkspace(workspace).setUserName(userName).setDSSLabels(labels); + return ((AppConn2LinkisRefExecutionOperation) developmentOperation).execute((RefJobContentRequestRef) developmentRequestRef); + }, null, "fetch linkis jobContent from appConn " + appConnName + " failed."); + return Message.ok().data("executionCode", responseRef.getCode()).data("params", responseRef.getParams()) + .data("engineType", responseRef.getEngineType()).data("runType", responseRef.getRunType()); + } + +} diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/entity/AppConn2LinkisPostAction.java b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/entity/AppConn2LinkisPostAction.java new file mode 100644 index 000000000..153d52ddf --- /dev/null +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/entity/AppConn2LinkisPostAction.java @@ -0,0 +1,35 @@ +package com.webank.wedatasphere.dss.linkis.node.execution.entity; + +import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; +import org.apache.linkis.httpclient.request.POSTAction; +import org.apache.linkis.httpclient.request.UserAction; + +public class AppConn2LinkisPostAction extends POSTAction implements UserAction { + + private String url; + private String user; + + @Override + public String getRequestPayload() { + return LinkisJobExecutionUtils.gson.toJson(getRequestPayloads()); + } + + public void setUrl(String url) { + this.url = url; + } + + @Override + public String getURL() { + return url; + } + + @Override + public void setUser(String user) { + this.user = user; + } + + @Override + public String getUser() { + return user; + } +} diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/AppConn2LinkisJobParser.java b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/AppConn2LinkisJobParser.java new file mode 100644 index 000000000..cd50eb122 --- /dev/null +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/AppConn2LinkisJobParser.java @@ -0,0 +1,92 @@ +package com.webank.wedatasphere.dss.linkis.node.execution.parser; + +import com.google.gson.reflect.TypeToken; +import com.webank.wedatasphere.dss.linkis.node.execution.conf.LinkisJobExecutionConfiguration; +import com.webank.wedatasphere.dss.linkis.node.execution.entity.AppConn2LinkisPostAction; +import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource; +import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException; +import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob; +import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; +import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob; +import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService; +import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; +import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisUjesClientUtils; +import org.apache.linkis.httpclient.dws.DWSHttpClient; +import org.apache.linkis.httpclient.dws.config.DWSClientConfig; +import org.apache.linkis.httpclient.response.impl.DefaultHttpResult; + +import java.util.HashMap; +import java.util.Map; + +public class AppConn2LinkisJobParser extends BML2LinkisJobParser { + + @Override + public void parseJob(Job job) throws Exception { + if(!(job instanceof AppConnLinkisJob)) { + return; + } + AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job; + String runType = appConnLinkisJob.getRunType(); + // 只处理包含 appconn2linkis 的 AppConnLinkisJob,例如:linkis.appconn..appconn2linkis + if(!runType.toLowerCase().contains("appconn2linkis")) { + return; + } + job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from AppConn execution.", runType)); + getAndSetCode(null, appConnLinkisJob); + } + + @Override + protected void dealExecutionParams(LinkisJob linkisAppConnJob, Map executionParams) { + String engineType = (String) executionParams.get("engineType"); + String runType = (String) executionParams.get("runType"); + setEngineType(linkisAppConnJob, engineType, runType); + } + + @Override + protected Map getExecutionParams(BMLResource bmlResource, LinkisJob job) { + String user = job.getUser(); + String linkisUrl = LinkisURLService.Factory.getLinkisURLService().getDefaultLinkisURL(job); + String token = LinkisJobExecutionConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(job.getJobProps()); + DWSHttpClient client = null; + DWSClientConfig clientConfig = LinkisUjesClientUtils.getClientConfig1_X(linkisUrl, user, token, job.getJobProps()); + AppConn2LinkisPostAction appConn2LinkisPostAction = new AppConn2LinkisPostAction(); + appConn2LinkisPostAction.setUrl("/api/rest_j/v1/dss/framework/appconn/execute"); + appConn2LinkisPostAction.addRequestPayload("workspaceStr", job.getRuntimeParams().get("workspace")); + appConn2LinkisPostAction.addRequestPayload("appConnName", job.getRunType().split("\\.")[0]); + appConn2LinkisPostAction.addRequestPayload("labels", getLabels(job.getJobProps().get("labels"))); + Map jobContent = new HashMap<>(); + if(job.getCode() != null && !job.getCode().isEmpty()) { + jobContent.putAll(LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken>() {}.getType())); + } + jobContent.putAll(job.getParams()); + appConn2LinkisPostAction.addRequestPayload("jobContent", jobContent); + appConn2LinkisPostAction.setUser(user); + job.getLogObj().info(String.format("try to ask AppConn %s to execute AppConn2Linkis with requestBody %s.", job.getRunType(), appConn2LinkisPostAction.getRequestPayload())); + appConn2LinkisPostAction.addHeader("Referer", ""); + try { + client = new DWSHttpClient(clientConfig, "Workspace-Fetch-Client-"); + DefaultHttpResult result = (DefaultHttpResult) client.execute(appConn2LinkisPostAction); + if (result.getStatusCode() == 200 || result.getStatusCode() == 0) { + Map responseBody = LinkisJobExecutionUtils.gson.fromJson(result.getResponseBody(), Map.class); + return (Map) responseBody.get("data"); + } else { + throw new LinkisJobExecutionErrorException(50063, "Failed to get workspace str, responseBody is: " + + result.getResponseBody()); + } + } finally { + if(client != null) { + client.close(); + } + } + } + + private String getLabels(String labels) { + if (labels.contains("route") || labels.contains("DSSEnv") ) { + Map labelMap = LinkisJobExecutionUtils.gson.fromJson(labels, Map.class); + return (String) labelMap.getOrDefault("route", labelMap.getOrDefault("DSSEnv", labels)); + } else { + return labels; + } + } + +} From 92939fb79c414a56833578532f5beba9b51b4d3b Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Fri, 11 Aug 2023 11:19:55 +0800 Subject: [PATCH 4/7] support to execute AppConn2Linkis and bml2linkis node type. --- .../execution/execution/impl/LinkisNodeExecutionImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl/LinkisNodeExecutionImpl.java b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl/LinkisNodeExecutionImpl.java index e0e6be918..abc431dc5 100644 --- a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl/LinkisNodeExecutionImpl.java +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/execution/impl/LinkisNodeExecutionImpl.java @@ -21,10 +21,7 @@ import com.webank.wedatasphere.dss.linkis.node.execution.execution.LinkisNodeExecution; import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; import com.webank.wedatasphere.dss.linkis.node.execution.listener.LinkisExecutionListener; -import com.webank.wedatasphere.dss.linkis.node.execution.parser.CodeParser; -import com.webank.wedatasphere.dss.linkis.node.execution.parser.JobParamsParser; -import com.webank.wedatasphere.dss.linkis.node.execution.parser.JobParser; -import com.webank.wedatasphere.dss.linkis.node.execution.parser.JobRuntimeParamsParser; +import com.webank.wedatasphere.dss.linkis.node.execution.parser.*; import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService; import com.webank.wedatasphere.dss.linkis.node.execution.service.impl.BuildJobActionImpl; import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; @@ -59,6 +56,8 @@ private LinkisNodeExecutionImpl() { registerJobParser(new CodeParser()); registerJobParser(new JobRuntimeParamsParser()); registerJobParser(new JobParamsParser()); + registerJobParser(new BML2LinkisJobParser()); + registerJobParser(new AppConn2LinkisJobParser()); } public static LinkisNodeExecution getLinkisNodeExecution() { From d9ebbc9f337cc1a0c4dba3e43639b48eb9d84519 Mon Sep 17 00:00:00 2001 From: wushengyeyouya <690574002@qq.com> Date: Fri, 11 Aug 2023 11:21:18 +0800 Subject: [PATCH 5/7] SparkEtl AppConn optimization, support to query jumpUrl. --- .../SparkEtlRefQueryJumpUrlOperation.java | 29 +++++++++++++++++++ .../query/SparkEtlRefQueryService.java | 12 ++++++++ .../standard/SparkEtlDevelopmentStandard.java | 7 +++++ 3 files changed, 48 insertions(+) create mode 100644 dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryJumpUrlOperation.java create mode 100644 dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryService.java diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryJumpUrlOperation.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryJumpUrlOperation.java new file mode 100644 index 000000000..a6aa758c2 --- /dev/null +++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryJumpUrlOperation.java @@ -0,0 +1,29 @@ +package com.webank.wedatasphere.dss.appconn.sparketl.query; + +import com.webank.wedatasphere.dss.standard.app.development.operation.AbstractDevelopmentOperation; +import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation; +import com.webank.wedatasphere.dss.standard.app.development.ref.QueryJumpUrlResponseRef; +import com.webank.wedatasphere.dss.standard.app.development.ref.impl.OnlyDevelopmentRequestRef; +import com.webank.wedatasphere.dss.standard.app.development.utils.QueryJumpUrlConstant; +import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException; +import org.apache.commons.lang3.StringUtils; + +public class SparkEtlRefQueryJumpUrlOperation extends AbstractDevelopmentOperation + implements RefQueryJumpUrlOperation { + + @Override + public QueryJumpUrlResponseRef query(OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl requestRef) throws ExternalOperationFailedException { + String jumpUrl = mergeBaseUrl("#/sparketl?resourceId=%s&version=%s&%s=%s&%s=%s"); + String resourceId = (String) requestRef.getRefJobContent().get("resourceId"); + String version = (String) requestRef.getRefJobContent().get("version"); + if(StringUtils.isBlank(resourceId) || StringUtils.isBlank(version)) { + logger.info("resourceId or version is empty, maybe user {} want to create a new node.", requestRef.getUserName()); + resourceId = ""; + version = ""; + } + jumpUrl = String.format(jumpUrl, resourceId, version, QueryJumpUrlConstant.NODE_ID.getKey(), + QueryJumpUrlConstant.NODE_ID.getValue(), QueryJumpUrlConstant.PROJECT_NAME.getKey(), QueryJumpUrlConstant.PROJECT_NAME.getValue()); + return QueryJumpUrlResponseRef.newBuilder().setJumpUrl(jumpUrl).build(); + } + +} diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryService.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryService.java new file mode 100644 index 000000000..b07305602 --- /dev/null +++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryService.java @@ -0,0 +1,12 @@ +package com.webank.wedatasphere.dss.appconn.sparketl.query; + +import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation; +import com.webank.wedatasphere.dss.standard.app.development.service.AbstractRefQueryService; + +public class SparkEtlRefQueryService extends AbstractRefQueryService { + + @Override + protected RefQueryJumpUrlOperation createRefQueryOperation() { + return new SparkEtlRefQueryJumpUrlOperation(); + } +} diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/standard/SparkEtlDevelopmentStandard.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/standard/SparkEtlDevelopmentStandard.java index 3f54509d3..eb51edf44 100644 --- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/standard/SparkEtlDevelopmentStandard.java +++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/standard/SparkEtlDevelopmentStandard.java @@ -1,7 +1,9 @@ package com.webank.wedatasphere.dss.appconn.sparketl.standard; import com.webank.wedatasphere.dss.appconn.sparketl.execution.SparkEtlExecutionService; +import com.webank.wedatasphere.dss.appconn.sparketl.query.SparkEtlRefQueryService; import com.webank.wedatasphere.dss.standard.app.development.service.RefExecutionService; +import com.webank.wedatasphere.dss.standard.app.development.service.RefQueryService; import com.webank.wedatasphere.dss.standard.app.development.standard.OnlyExecutionDevelopmentStandard; public class SparkEtlDevelopmentStandard extends OnlyExecutionDevelopmentStandard { @@ -11,6 +13,11 @@ protected RefExecutionService createRefExecutionService() { return new SparkEtlExecutionService(); } + @Override + protected RefQueryService createRefQueryService() { + return new SparkEtlRefQueryService(); + } + @Override public void init() { } From 17e5e4da7b4d65ab42e1230e5cf24e31d817bf97 Mon Sep 17 00:00:00 2001 From: ryanlei Date: Fri, 11 Aug 2023 16:14:46 +0800 Subject: [PATCH 6/7] Update build.yml lerna version --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bf9577849..052bb8487 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -51,6 +51,6 @@ jobs: - name: Build frontend by node.js run: | cd web - npm install lerna -g + npm install lerna@4.0.0 -g lerna bootstrap npm run build From 0b60a990a62b26c57ac83b6f69bd0ec54e518a4d Mon Sep 17 00:00:00 2001 From: zqburde Date: Thu, 17 Aug 2023 14:13:01 +0800 Subject: [PATCH 7/7] Delete unuseful code. --- .../workflows/module/process/component/nodeparameter.vue | 1 - 1 file changed, 1 deletion(-) diff --git a/web/packages/workflows/module/process/component/nodeparameter.vue b/web/packages/workflows/module/process/component/nodeparameter.vue index a086b47ef..036f5720a 100644 --- a/web/packages/workflows/module/process/component/nodeparameter.vue +++ b/web/packages/workflows/module/process/component/nodeparameter.vue @@ -364,7 +364,6 @@ export default { }, resourcesAction() { let resources = []; - // 写的啥jb代码,下面改了重复添加问题,有其它问题看历史版本吧 const mapFlag = {} if (this.currentNode.jobContent && this.currentNode.jobContent.script) { this.currentNode.resources.forEach(item => {