From c6fe98b17b5bfacf458dbe5ceaf211a75b506f50 Mon Sep 17 00:00:00 2001 From: Jeff Phillips Date: Thu, 2 May 2024 14:10:30 -0400 Subject: [PATCH] Fix for adding all artifacts, putting spacer nodes in groups (#7) --- .../topology/usePipelineTaskTopology.ts | 142 ++++++++++++------ .../topology/PipelineDefaultTaskGroup.tsx | 19 +-- .../topology/PipelineVisualizationSurface.tsx | 11 ++ 3 files changed, 112 insertions(+), 60 deletions(-) diff --git a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts index e911640b22..f38633c55c 100644 --- a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts +++ b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts @@ -10,6 +10,7 @@ import { PipelineNodeModelExpanded } from '~/concepts/topology/types'; import { createArtifactNode, createGroupNode } from '~/concepts/topology/utils'; import { Execution } from '~/third_party/mlmd'; import { + ComponentArtifactMap, composeArtifactType, parseComponentsForArtifactRelationship, parseInputOutput, @@ -17,30 +18,77 @@ import { parseRuntimeInfoFromRunDetails, parseTasksForArtifactRelationship, parseVolumeMounts, + TaskArtifactMap, translateStatusForNode, } from './parseUtils'; -import { PipelineTask } from './pipelineTaskTypes'; +import { PipelineTask, PipelineTaskRunStatus } from './pipelineTaskTypes'; const EMPTY_STATE: PipelineNodeModelExpanded[] = []; +const getNodeArtifacts = ( + taskId: string, + status: PipelineTaskRunStatus | undefined, + componentRef: string, + componentArtifactMap: ComponentArtifactMap, + taskArtifactMap: TaskArtifactMap, +): PipelineNodeModelExpanded[] => { + const artifactsInComponent = componentArtifactMap[componentRef]; + const artifactNodes: PipelineNodeModelExpanded[] = []; + if (artifactsInComponent) { + const artifactNodeData = taskArtifactMap[taskId]; + + Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => { + const label = artifactKey; + const { artifactId } = + artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {}; + + // if no node needs it as an input, we don't really need a well known id + const id = artifactId ?? artifactKey; + + const artifactPipelineTask: PipelineTask = { + type: 'artifact', + name: label, + inputs: { + artifacts: [{ label: id, type: composeArtifactType(data) }], + }, + }; + + artifactNodes.push( + createArtifactNode( + id, + label, + artifactPipelineTask, + [taskId], + translateStatusForNode(status?.state), + data.schemaTitle, + ), + ); + }); + } + return artifactNodes; +}; + const getNestedNodes = ( spec: PipelineSpecVariable, items: Record, components: PipelineComponentsKF, executors: PipelineExecutorsKF, + componentArtifactMap: ComponentArtifactMap, + taskArtifactMap: TaskArtifactMap, runDetails?: RunDetailsKF, executions?: Execution[] | null, ): [nestedNodes: PipelineNodeModelExpanded[], children: string[]] => { const nodes: PipelineNodeModelExpanded[] = []; const children: string[] = []; - Object.entries(items).forEach(([name, details]) => { + Object.entries(items).forEach(([taskId, details]) => { const componentRef = details.componentRef.name; const component = components[componentRef]; + const taskName = details.taskInfo.name; - const status = executions - ? parseRuntimeInfoFromExecutions(name, executions) - : parseRuntimeInfoFromRunDetails(name, runDetails); + const status = + parseRuntimeInfoFromExecutions(taskId, executions) || + parseRuntimeInfoFromRunDetails(taskId, runDetails); const runAfter: string[] = details.dependentTasks ?? []; const hasSubTask = @@ -52,7 +100,7 @@ const getNestedNodes = ( const pipelineTask: PipelineTask = { type: 'groupTask', - name, + name: taskName, steps: executor ? [executor.container] : undefined, inputs: parseInputOutput(component?.inputDefinitions), outputs: parseInputOutput(component?.outputDefinitions), @@ -60,31 +108,47 @@ const getNestedNodes = ( volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel), }; + const artifactNodes = getNodeArtifacts( + taskId, + status, + componentRef, + componentArtifactMap, + taskArtifactMap, + ); + if (artifactNodes.length) { + nodes.push(...artifactNodes); + children.push(...artifactNodes.map((n) => n.id)); + } + if (hasSubTask && subTasks) { const [nestedNodes, nestedChildren] = getNestedNodes( spec, subTasks, components, executors, + componentArtifactMap, + taskArtifactMap, runDetails, executions, ); + const newChildren = nestedChildren.filter((child) => !nodes.find((n) => n.id === child)); + const newNodes = nestedNodes.filter((node) => !nodes.find((n) => n.id === node.id)); const itemNode = createGroupNode( - name, - name, + taskId, + taskName, pipelineTask, runAfter, translateStatusForNode(status?.state), - nestedChildren, + newChildren, ); - nodes.push(itemNode, ...nestedNodes); + nodes.push(itemNode, ...newNodes); } else { nodes.push( - createNode(name, name, pipelineTask, runAfter, translateStatusForNode(status?.state)), + createNode(taskId, taskName, pipelineTask, runAfter, translateStatusForNode(status?.state)), ); } - children.push(name); + children.push(taskId); }); return [nodes, children]; @@ -116,50 +180,28 @@ export const usePipelineTaskTopology = ( const componentRef = taskValue.componentRef.name; const component = components[componentRef]; - const artifactsInComponent = componentArtifactMap[componentRef]; const isGroupNode = !!component?.dag; const groupTasks = component?.dag?.tasks; const executorLabel = component?.executorLabel; const executor = executorLabel ? executors[executorLabel] : undefined; - const status = executions - ? parseRuntimeInfoFromExecutions(taskId, executions) - : parseRuntimeInfoFromRunDetails(taskId, runDetails); + const status = + parseRuntimeInfoFromExecutions(taskId, executions) || + parseRuntimeInfoFromRunDetails(taskId, runDetails); const nodes: PipelineNodeModelExpanded[] = []; const runAfter: string[] = taskValue.dependentTasks ?? []; - if (artifactsInComponent) { - const artifactNodeData = taskArtifactMap[taskId]; - - Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => { - const label = artifactKey; - const { artifactId } = - artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {}; - - // if no node needs it as an input, we don't really need a well known id - const id = artifactId ?? artifactKey; - - const pipelineTask: PipelineTask = { - type: 'artifact', - name: label, - inputs: { - artifacts: [{ label: id, type: composeArtifactType(data) }], - }, - }; - - nodes.push( - createArtifactNode( - id, - label, - pipelineTask, - [taskId], - translateStatusForNode(status?.state), - data.schemaTitle, - ), - ); - }); + const artifactNodes = getNodeArtifacts( + taskId, + status, + componentRef, + componentArtifactMap, + taskArtifactMap, + ); + if (artifactNodes.length) { + nodes.push(...artifactNodes); } if (taskValue.dependentTasks) { @@ -192,18 +234,22 @@ export const usePipelineTaskTopology = ( groupTasks, components, executors, + componentArtifactMap, + taskArtifactMap, runDetails, executions, ); + const newChildren = children.filter((child) => !nodes.find((n) => n.id === child)); + const newNodes = nestedNodes.filter((node) => !nodes.find((n) => n.id === node.id)); const itemNode = createGroupNode( taskId, taskName, pipelineTask, runAfter, translateStatusForNode(status?.state), - children, + newChildren, ); - nodes.push(itemNode, ...nestedNodes); + nodes.push(itemNode, ...newNodes); } else { nodes.push( createNode(taskId, taskName, pipelineTask, runAfter, translateStatusForNode(status?.state)), diff --git a/frontend/src/concepts/topology/PipelineDefaultTaskGroup.tsx b/frontend/src/concepts/topology/PipelineDefaultTaskGroup.tsx index a187143d44..63e5bd02a9 100644 --- a/frontend/src/concepts/topology/PipelineDefaultTaskGroup.tsx +++ b/frontend/src/concepts/topology/PipelineDefaultTaskGroup.tsx @@ -1,7 +1,6 @@ import * as React from 'react'; import { - LabelPosition, WithSelectionProps, isNode, DefaultTaskGroup, @@ -22,6 +21,8 @@ import { PipelineNodeModelExpanded, StandardTaskNodeData } from '~/concepts/topo import NodeStatusIcon from '~/concepts/topology/NodeStatusIcon'; import { NODE_HEIGHT, NODE_WIDTH } from './const'; +const MAX_TIP_ITEMS = 6; + type PipelinesDefaultGroupProps = { element: GraphElement; } & WithSelectionProps; @@ -36,20 +37,15 @@ const DefaultTaskGroupInner: React.FunctionComponent(null); const detailsLevel = element.getGraph().getDetailsLevel(); - const MAX_TIP_ITEMS = 6; - const getPopoverTasksList = (items: Node[]) => ( {items.slice(0, MAX_TIP_ITEMS).map((item: Node) => ( - - {item.getData()?.runStatus && ( - - - - )} - {!item.getData()?.runStatus &&
 
} - {item.getLabel()} + + + + + {item.getLabel()}
))} @@ -61,7 +57,6 @@ const DefaultTaskGroupInner: React.FunctionComponent const spacerNodes = getSpacerNodes(updateNodes); + // find the parent of each spacer node + spacerNodes.forEach((spacerNode) => { + const nodeIds = spacerNode.id.split('|'); + if (nodeIds[0]) { + const parent = updateNodes.find((n) => n.children?.includes(nodeIds[0])); + if (parent) { + parent.children?.push(spacerNode.id); + } + } + }); + // Dagre likes the root nodes to be first in the order const renderNodes = [...spacerNodes, ...updateNodes].sort( (a, b) => (a.runAfterTasks?.length ?? 0) - (b.runAfterTasks?.length ?? 0),