diff --git a/frontend/src/concepts/pipelines/kfTypes.ts b/frontend/src/concepts/pipelines/kfTypes.ts index 8d5ac602de..d77c8e014d 100644 --- a/frontend/src/concepts/pipelines/kfTypes.ts +++ b/frontend/src/concepts/pipelines/kfTypes.ts @@ -175,15 +175,17 @@ export type InputOutputArtifactType = { }; export type InputOutputDefinition = { - artifacts?: Record< - string, - { - artifactType: InputOutputArtifactType; - } - >; + artifacts?: InputOutputDefinitionArtifacts; parameters?: ParametersKF; }; +export type InputOutputDefinitionArtifacts = Record< + string, + { + artifactType: InputOutputArtifactType; + } +>; + type GroupNodeComponent = { dag: DAG; }; @@ -234,17 +236,22 @@ export type TaskKF = { inputs?: { artifacts?: Record< string, - { - taskOutputArtifact?: { - /** Artifact node name */ - outputArtifactKey: string; - /** - * The task string for runAfter - * @see DAG.tasks - */ - producerTask: string; - }; - } + EitherNotBoth< + { + taskOutputArtifact?: { + /** Artifact node name */ + outputArtifactKey: string; + /** + * The task string for runAfter + * @see DAG.tasks + */ + producerTask: string; + }; + }, + { + componentInputArtifact: string; + } + > >; parameters?: Record< string, @@ -326,9 +333,7 @@ export type PipelineSpec = { }; root: { dag: DAG; - inputDefinitions?: { - parameters: ParametersKF; - }; + inputDefinitions?: InputOutputDefinition; }; schemaVersion: string; sdkVersion: string; diff --git a/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts b/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts index 416b33aa7e..bf40d9b958 100644 --- a/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts +++ b/frontend/src/concepts/pipelines/topology/__tests__/parseUtils.spec.ts @@ -417,11 +417,25 @@ describe('pipeline topology parseUtils', () => { taskInfo: { name: 'task-2' }, triggerPolicy: { strategy: TriggerStrategy.ALL_UPSTREAM_TASKS_SUCCEEDED }, }, + 'task-3': { + cachingOptions: { enableCache: true }, + componentRef: { name: 'comp-task-3' }, + dependentTasks: [], + inputs: { + artifacts: { + 'task-3-artifact-3': { + componentInputArtifact: 'test-artifact', + }, + }, + }, + taskInfo: { name: 'task-3' }, + triggerPolicy: { strategy: TriggerStrategy.ALL_UPSTREAM_TASKS_SUCCEEDED }, + }, }; const consoleWarnSpy = jest.spyOn(global.console, 'warn'); it('returns empty object when no task artifacts exist', () => { - const result = parseTasksForArtifactRelationship({ + const result = parseTasksForArtifactRelationship('root', { 'task-1': { ...testTasks['task-1'], inputs: {} }, 'task-2': { ...testTasks['task-2'], inputs: {} }, }); @@ -429,21 +443,18 @@ describe('pipeline topology parseUtils', () => { }); it('returns task artifact map when artifacts are provided', () => { - const result = parseTasksForArtifactRelationship(testTasks); + const result = parseTasksForArtifactRelationship('root', testTasks); expect(result).toEqual({ - 'some-dag-task-2': [ - { outputArtifactKey: 'task-1-artifact-name', artifactId: 'task-1-artifact-1' }, - ], - 'some-dag-task-1': [ - { outputArtifactKey: 'task-2-artifact-name', artifactId: 'task-2-artifact-2' }, - ], + 'task-1': [{ artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-2.task-1-artifact-name' }], + 'task-2': [{ artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-1.task-2-artifact-name' }], + 'task-3': [{ artifactNodeId: 'GROUP.root.ARTIFACT..test-artifact' }], }); }); describe('returns warning with unmapped artifact for a task when', () => { it('no producerTask is found', () => { - const result = parseTasksForArtifactRelationship({ + const result = parseTasksForArtifactRelationship('root', { ...testTasks, 'task-2': { ...testTasks['task-2'], @@ -464,17 +475,17 @@ describe('pipeline topology parseUtils', () => { taskOutputArtifact: { outputArtifactKey: 'task-2-artifact-name', producerTask: '' }, }); expect(result).toEqual({ - 'some-dag-task-2': [ + 'task-1': [ { - artifactId: 'task-1-artifact-1', - outputArtifactKey: 'task-1-artifact-name', + artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-2.task-1-artifact-name', }, ], + 'task-3': [{ artifactNodeId: 'GROUP.root.ARTIFACT..test-artifact' }], }); }); it('no outputArtifactKey is found', () => { - const result = parseTasksForArtifactRelationship({ + const result = parseTasksForArtifactRelationship('root', { ...testTasks, 'task-2': { ...testTasks['task-2'], @@ -495,17 +506,17 @@ describe('pipeline topology parseUtils', () => { taskOutputArtifact: { outputArtifactKey: '', producerTask: 'some-dag-task-1' }, }); expect(result).toEqual({ - 'some-dag-task-2': [ + 'task-1': [ { - artifactId: 'task-1-artifact-1', - outputArtifactKey: 'task-1-artifact-name', + artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-2.task-1-artifact-name', }, ], + 'task-3': [{ artifactNodeId: 'GROUP.root.ARTIFACT..test-artifact' }], }); }); it('no taskOutputArtifact is found', () => { - const result = parseTasksForArtifactRelationship({ + const result = parseTasksForArtifactRelationship('root', { 'task-1': { ...testTasks['task-1'], inputs: { diff --git a/frontend/src/concepts/pipelines/topology/__tests__/usePipelineTaskTopology.spec.ts b/frontend/src/concepts/pipelines/topology/__tests__/usePipelineTaskTopology.spec.ts index 47d90a7a90..edc1660cd8 100644 --- a/frontend/src/concepts/pipelines/topology/__tests__/usePipelineTaskTopology.spec.ts +++ b/frontend/src/concepts/pipelines/topology/__tests__/usePipelineTaskTopology.spec.ts @@ -6,6 +6,9 @@ import { ICON_TASK_NODE_TYPE } from '~/concepts/topology/utils'; import { EXECUTION_TASK_NODE_TYPE } from '~/concepts/topology/const'; describe('usePipelineTaskTopology', () => { + beforeEach(() => { + jest.spyOn(console, 'warn').mockImplementation(jest.fn()); + }); it('returns the correct number of nodes', () => { const renderResult = testHook(usePipelineTaskTopology)(mockLargePipelineSpec); const nodes = renderResult.result.current; @@ -14,9 +17,9 @@ describe('usePipelineTaskTopology', () => { const groups = nodes.filter((n) => n.type === EXECUTION_TASK_NODE_TYPE); const artifactNodes = nodes.filter((n) => n.type === ICON_TASK_NODE_TYPE); - expect(nodes).toHaveLength(86); + expect(nodes).toHaveLength(107); expect(tasks).toHaveLength(35); expect(groups).toHaveLength(5); - expect(artifactNodes).toHaveLength(46); + expect(artifactNodes).toHaveLength(67); }); }); diff --git a/frontend/src/concepts/pipelines/topology/parseUtils.ts b/frontend/src/concepts/pipelines/topology/parseUtils.ts index 79870ead5b..d7ef51de1d 100644 --- a/frontend/src/concepts/pipelines/topology/parseUtils.ts +++ b/frontend/src/concepts/pipelines/topology/parseUtils.ts @@ -44,33 +44,47 @@ export const parseComponentsForArtifactRelationship = ( ); export type TaskArtifactMap = { - [taskName: string]: { outputArtifactKey: string; artifactId: string }[] | undefined; + [taskName: string]: { artifactNodeId: string }[] | undefined; }; -export const parseTasksForArtifactRelationship = (tasks: DAG['tasks']): TaskArtifactMap => - Object.values(tasks).reduce( - (map, taskValue) => - Object.entries(taskValue.inputs?.artifacts ?? {}).reduce( - (artifactItems, [artifactId, value]) => { - const { producerTask: taskId, outputArtifactKey } = value.taskOutputArtifact || {}; - if (!taskId || !outputArtifactKey) { - // eslint-disable-next-line no-console - console.warn('Issue constructing artifact node', value); - return artifactItems; - } - +export const parseTasksForArtifactRelationship = ( + groupId: string | undefined, + tasks: DAG['tasks'], +): TaskArtifactMap => + Object.entries(tasks).reduce( + (map, [taskId, taskValue]) => + Object.entries(taskValue.inputs?.artifacts ?? {}).reduce((artifactItems, [, value]) => { + // artifact without inputs + if (value.componentInputArtifact) { return { ...artifactItems, [taskId]: [ ...(artifactItems[taskId] ?? []), { - outputArtifactKey, - artifactId, + artifactNodeId: idForTaskArtifact(groupId, '', value.componentInputArtifact), }, ], }; - }, - map, - ), + } + + // else, artifacts with inputs from tasks + const { producerTask, outputArtifactKey } = value.taskOutputArtifact || {}; + + if (!producerTask || !outputArtifactKey) { + // eslint-disable-next-line no-console + console.warn('Issue constructing artifact node', value); + return artifactItems; + } + + return { + ...artifactItems, + [taskId]: [ + ...(artifactItems[taskId] ?? []), + { + artifactNodeId: idForTaskArtifact(groupId, producerTask, outputArtifactKey), + }, + ], + }; + }, map), {}, ); @@ -314,3 +328,12 @@ export const parseVolumeMounts = ( name: pvc.taskOutputParameter?.producerTask ?? '', })); }; + +export const idForTaskArtifact = ( + groupId: string | undefined, + taskId: string, + artifactId: string, +): string => + groupId + ? `GROUP.${groupId}.ARTIFACT.${taskId}.${artifactId}` + : `ARTIFACT.${taskId}.${artifactId}`; diff --git a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.tsx b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.tsx index 708981d0d3..99a7133363 100644 --- a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.tsx +++ b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.tsx @@ -1,5 +1,8 @@ import * as React from 'react'; +import * as _ from 'lodash-es'; import { + InputOutputArtifactType, + InputOutputDefinitionArtifacts, PipelineComponentsKF, PipelineExecutorsKF, PipelineSpecVariable, @@ -13,6 +16,7 @@ import { Execution } from '~/third_party/mlmd'; import { ComponentArtifactMap, composeArtifactType, + idForTaskArtifact, parseComponentsForArtifactRelationship, parseInputOutput, parseRuntimeInfoFromExecutions, @@ -24,8 +28,40 @@ import { } from './parseUtils'; import { PipelineTask, PipelineTaskRunStatus } from './pipelineTaskTypes'; -const idForTaskArtifact = (groupId: string | undefined, artifactId: string) => - groupId ? `${groupId}-ARTIFACT-${artifactId}` : artifactId; +const getArtifactPipelineTask = ( + name: string, + artifactType: InputOutputArtifactType, +): PipelineTask => ({ + type: 'artifact', + name, + inputs: { + artifacts: [{ label: name, type: composeArtifactType(artifactType) }], + }, +}); + +/** + * Get the artifact nodes without the inputs from a task node + */ +const getInputArtifacts = ( + groupId: string | undefined, + status: PipelineTaskRunStatus | undefined, + inputArtifacts: InputOutputDefinitionArtifacts | undefined, +) => { + if (!inputArtifacts) { + return []; + } + + return Object.entries(inputArtifacts).map(([artifactKey, data]) => + createArtifactNode( + idForTaskArtifact(groupId, '', artifactKey), + artifactKey, + getArtifactPipelineTask(artifactKey, data.artifactType), + undefined, + translateStatusForNode(status?.state), + data.artifactType.schemaTitle, + ), + ); +}; const getTaskArtifacts = ( groupId: string | undefined, @@ -33,42 +69,22 @@ const getTaskArtifacts = ( status: PipelineTaskRunStatus | undefined, componentRef: string, componentArtifactMap: ComponentArtifactMap, - taskArtifactMap: TaskArtifactMap, ): PipelineNodeModelExpanded[] => { const artifactsInComponent = componentArtifactMap[componentRef]; - const artifactNodes: PipelineNodeModelExpanded[] = []; - if (artifactsInComponent) { - const artifactNodeData = taskArtifactMap[taskId]; - - Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => { - const label = artifactKey; - const { artifactId } = - artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {}; - - // if no node needs it as an input, we don't really need a well known id, prepend taskId to ensure uniqueness - const id = idForTaskArtifact(groupId, artifactId ?? artifactKey); - - const artifactPipelineTask: PipelineTask = { - type: 'artifact', - name: label, - inputs: { - artifacts: [{ label, type: composeArtifactType(data) }], - }, - }; - - artifactNodes.push( - createArtifactNode( - id, - label, - artifactPipelineTask, - [taskId], - translateStatusForNode(status?.state), - data.schemaTitle, - ), - ); - }); + + if (!artifactsInComponent) { + return []; } - return artifactNodes; + return Object.entries(artifactsInComponent).map(([artifactKey, data]) => + createArtifactNode( + idForTaskArtifact(groupId, taskId, artifactKey), + artifactKey, + getArtifactPipelineTask(artifactKey, data), + [taskId], + translateStatusForNode(status?.state), + data.schemaTitle, + ), + ); }; const getNodesForTasks = ( @@ -81,6 +97,7 @@ const getNodesForTasks = ( taskArtifactMap: TaskArtifactMap, runDetails?: RunDetailsKF, executions?: Execution[] | null, + inputArtifacts?: InputOutputDefinitionArtifacts, ): [nestedNodes: PipelineNodeModelExpanded[], children: string[]] => { const nodes: PipelineNodeModelExpanded[] = []; const children: string[] = []; @@ -95,7 +112,9 @@ const getNodesForTasks = ( parseRuntimeInfoFromRunDetails(taskId, runDetails); const runStatus = translateStatusForNode(status?.state); - const runAfter: string[] = details.dependentTasks ?? []; + // add edges from one task to its parent tasks + const runAfter: string[] = + details.dependentTasks?.filter((t) => Object.keys(items).includes(t)) ?? []; const hasSubTask = Object.keys(components).find((task) => task === componentRef) && components[componentRef]?.dag; @@ -106,41 +125,40 @@ const getNodesForTasks = ( const pipelineTask: PipelineTask = { type: 'groupTask', name: taskName, - steps: executor ? [executor.container] : undefined, + steps: executor?.container ? [executor.container] : undefined, inputs: parseInputOutput(component?.inputDefinitions), outputs: parseInputOutput(component?.outputDefinitions), status, volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel), }; + // Build artifact nodes with inputs from task nodes const artifactNodes = getTaskArtifacts( groupId, taskId, status, componentRef, componentArtifactMap, - taskArtifactMap, ); if (artifactNodes.length) { nodes.push(...artifactNodes); children.push(...artifactNodes.map((n) => n.id)); } - if (details.dependentTasks) { - // This task's runAfters may need artifact relationships -- find those artifactIds - runAfter.push( - ...details.dependentTasks - .map((dependantTaskId) => { - const art = taskArtifactMap[dependantTaskId]; - return art ? art.map((v) => idForTaskArtifact(groupId, v.artifactId)) : null; - }) - .filter((v): v is string[] => !!v) - .flat(), - ); + // Build artifact nodes without inputs + const inputArtifactNodes = getInputArtifacts(groupId, status, inputArtifacts); + if (inputArtifactNodes.length) { + nodes.push(...inputArtifactNodes); + children.push(...inputArtifactNodes.map((n) => n.id)); } + // Read the artifact-task map we built before + // Add edge from artifact to task + const artifactToTaskEdges = taskArtifactMap[taskId]?.map((v) => v.artifactNodeId) ?? []; + runAfter.push(...artifactToTaskEdges); + if (hasSubTask && subTasks) { - const subTasksArtifactMap = parseTasksForArtifactRelationship(subTasks); + const subTasksArtifactMap = parseTasksForArtifactRelationship(taskId, subTasks); const [nestedNodes, taskChildren] = getNodesForTasks( taskId, @@ -152,6 +170,7 @@ const getNodesForTasks = ( subTasksArtifactMap, runDetails, executions, + component?.inputDefinitions?.artifacts, ); const itemNode = createGroupNode( @@ -188,27 +207,27 @@ export const usePipelineTaskTopology = ( deploymentSpec: { executors }, root: { dag: { tasks }, + inputDefinitions, }, } = pipelineSpec; const componentArtifactMap = parseComponentsForArtifactRelationship(components); - const taskArtifactMap = parseTasksForArtifactRelationship(tasks); + const taskArtifactMap = parseTasksForArtifactRelationship('root', tasks); - const nodes = getNodesForTasks( - 'root', - spec, - tasks, - components, - executors, - componentArtifactMap, - taskArtifactMap, - runDetails, - executions, - )[0]; - - // Since we have artifacts that are input only that do not get created, remove any dependencies on them - return nodes.map((n) => ({ - ...n, - runAfterTasks: n.runAfterTasks?.filter((t) => nodes.find((nextNode) => nextNode.id === t)), - })); + // There are some duplicated nodes, remove them + return _.uniqBy( + getNodesForTasks( + 'root', + spec, + tasks, + components, + executors, + componentArtifactMap, + taskArtifactMap, + runDetails, + executions, + inputDefinitions?.artifacts, + )[0], + (node) => node.id, + ); }, [executions, runDetails, spec]);