diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx index 05f2edb79e553..13a8050dc046d 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx @@ -4,8 +4,8 @@ import { RunCard } from "@/components/ui/run-card"; type Props = { data: Array<{ taskRun: components["schemas"]["TaskRun"]; - flowRun?: components["schemas"]["FlowRun"]; - flow?: components["schemas"]["Flow"]; + flowRun?: components["schemas"]["FlowRun"] | null; + flow?: components["schemas"]["Flow"] | null; }>; }; diff --git a/ui-v2/src/components/ui/run-card/run-card.tsx b/ui-v2/src/components/ui/run-card/run-card.tsx index 6775cd7b9cf14..f6042b38c6571 100644 --- a/ui-v2/src/components/ui/run-card/run-card.tsx +++ b/ui-v2/src/components/ui/run-card/run-card.tsx @@ -18,8 +18,8 @@ const getValues = ({ flowRun, taskRun, }: { - flowRun: undefined | components["schemas"]["FlowRun"]; - taskRun: undefined | components["schemas"]["TaskRun"]; + flowRun: null | undefined | components["schemas"]["FlowRun"]; + taskRun: null | undefined | components["schemas"]["TaskRun"]; }) => { if (taskRun) { const { state, start_time, tags, estimated_run_time } = taskRun; @@ -34,10 +34,10 @@ const getValues = ({ }; type Props = { - flow?: components["schemas"]["Flow"]; - flowRun?: components["schemas"]["FlowRun"]; + flow?: components["schemas"]["Flow"] | null; + flowRun?: components["schemas"]["FlowRun"] | null; /** If task run is included, uses fields from task run over flow run */ - taskRun?: components["schemas"]["TaskRun"]; + taskRun?: components["schemas"]["TaskRun"] | null; }; export const RunCard = ({ flow, flowRun, taskRun }: Props) => { diff --git a/ui-v2/src/hooks/task-run-concurrency-limits.ts b/ui-v2/src/hooks/task-run-concurrency-limits.ts index 4e4741b4151d8..66b84afcaa9d2 100644 --- a/ui-v2/src/hooks/task-run-concurrency-limits.ts +++ b/ui-v2/src/hooks/task-run-concurrency-limits.ts @@ -212,6 +212,110 @@ export const useResetTaskRunConcurrencyLimitTag = () => { }; }; +const fetchTaskRunConcurrencyLimit = async (id: string) => { + // GET task-run-concurrency-limit by id + const res = await getQueryService().GET("/concurrency_limits/{id}", { + params: { path: { id } }, + }); + if (!res.data) { + throw new Error("'data' expected"); + } + return res.data; +}; + +const fetchActiveTaskRunDetails = async (activeSlots: Array) => { + const taskRuns = await getQueryService().POST("/task_runs/filter", { + body: { + task_runs: { + id: { any_: activeSlots }, + operator: "or_", + }, + sort: "NAME_DESC", + offset: 0, + }, + }); + if (!taskRuns.data) { + throw new Error("'data' expected"); + } + const taskRunsWithFlows: Array = []; + const taskRunsOnly: Array = []; + + taskRuns.data.forEach((taskRun) => { + if (taskRun.flow_run_id) { + taskRunsWithFlows.push(taskRun); + } else { + taskRunsOnly.push(taskRun); + } + }); + + const flowRunsIds = taskRunsWithFlows.map( + (taskRun) => taskRun.flow_run_id as string, + ); + + // Get Flow Runs info + const flowRuns = await getQueryService().POST("/flow_runs/filter", { + body: { + flow_runs: { + id: { any_: flowRunsIds }, + operator: "or_", + }, + sort: "NAME_DESC", + offset: 0, + }, + }); + if (!flowRuns.data) { + throw new Error("'data' expected"); + } + const hasSameFlowID = flowRuns.data.every( + (flowRun) => flowRun.flow_id === flowRuns.data[0].flow_id, + ); + if (!hasSameFlowID) { + throw new Error("Flow runs has mismatching 'flow_id'"); + } + const flowID = flowRuns.data[0].flow_id; + + // Get Flow info + const flow = await getQueryService().GET("/flows/{id}", { + params: { path: { id: flowID } }, + }); + + if (!flow.data) { + throw new Error("'data' expected"); + } + + // Normalize data per active slot : + /** + * + * -> active_slot (task_run_id 1) -> flow_run (flow_run_id 1) + * concurrencyLimit -> active_slot (task_run_id 2) -> flow_run (flow_run_id 2) -> flow (flow_id) + * -> active_slot (task_run_id 3) -> flow_run (flow_run_id 3) + * + */ + const activeTaskRunsWithFlows = taskRunsWithFlows.map((taskRunsWithFlow) => { + const flowRun = flowRuns.data.find( + (flowRun) => flowRun.id === taskRunsWithFlow.flow_run_id, + ); + + if (!flowRun) { + throw new Error('"Expected to find flowRun'); + } + + return { + taskRun: taskRunsWithFlow, + flowRun, + flow: flow.data, + }; + }); + + const activeTaskRunsWithoutFlows = taskRunsOnly.map((taskRun) => ({ + taskRun, + flowRun: null, + flow: null, + })); + + return [...activeTaskRunsWithFlows, ...activeTaskRunsWithoutFlows]; +}; + /** * * @param id @@ -221,98 +325,23 @@ export const buildConcurrenyLimitDetailsActiveRunsQuery = (id: string) => queryOptions({ queryKey: queryKeyFactory.activeTaskRun(id), queryFn: async () => { - // GET task-run-concurrency-limit by id - const taskRunConcurrencyLimit = await getQueryService().GET( - "/concurrency_limits/{id}", - { params: { path: { id } } }, - ); - if (!taskRunConcurrencyLimit.data?.active_slots) { + const taskRunConcurrencyLimit = await fetchTaskRunConcurrencyLimit(id); + if (!taskRunConcurrencyLimit.active_slots) { throw new Error("'active_slots' expected"); } // Early exit because there are no active task runs - if (taskRunConcurrencyLimit.data.active_slots.length === 0) { + if (taskRunConcurrencyLimit.active_slots.length === 0) { return { - taskRunConcurrencyLimit: taskRunConcurrencyLimit.data, + taskRunConcurrencyLimit: taskRunConcurrencyLimit, activeTaskRuns: [], }; } - - // Get Task Runs info - const taskRuns = await getQueryService().POST("/task_runs/filter", { - body: { - task_runs: { - id: { any_: taskRunConcurrencyLimit.data.active_slots }, - operator: "or_", - }, - sort: "NAME_DESC", - offset: 0, - }, - }); - if (!taskRuns.data) { - throw new Error("'data' expected"); - } - - const flowRunsIds = taskRuns.data - .filter((taskRun) => Boolean(taskRun.flow_run_id)) - .map((taskRun) => taskRun.flow_run_id) as Array; - - // Get Flow Runs info - const flowRuns = await getQueryService().POST("/flow_runs/filter", { - body: { - flow_runs: { - id: { any_: flowRunsIds }, - operator: "or_", - }, - sort: "NAME_DESC", - offset: 0, - }, - }); - if (!flowRuns.data) { - throw new Error("'data' expected"); - } - const hasSameFlowID = flowRuns.data.every( - (flowRun) => flowRun.flow_id === flowRuns.data[0].flow_id, + const activeTaskRuns = await fetchActiveTaskRunDetails( + taskRunConcurrencyLimit.active_slots, ); - if (!hasSameFlowID) { - throw new Error("Flow runs has mismatching 'flow_id'"); - } - const flowID = flowRuns.data[0].flow_id; - - // Get Flow info - const flow = await getQueryService().GET("/flows/{id}", { - params: { path: { id: flowID } }, - }); - - if (!flow.data) { - throw new Error("'data' expected"); - } - - // Normalize data per active slot : - /** - * - * -> active_slot (task_run_id 1) -> flow_run (flow_run_id 1) - * concurrencyLimit -> active_slot (task_run_id 2) -> flow_run (flow_run_id 2) -> flow (flow_id) - * -> active_slot (task_run_id 3) -> flow_run (flow_run_id 3) - * - */ - const activeTaskRuns = taskRuns.data.map((taskRun) => { - const flowRun = flowRuns.data.find( - (flowRun) => flowRun.id === taskRun.flow_run_id, - ); - - if (!flowRun) { - throw new Error('"Expected to find flowRun'); - } - - return { - taskRun, - flowRun, - flow: flow.data, - }; - }); return { - taskRunConcurrencyLimit: taskRunConcurrencyLimit.data, + taskRunConcurrencyLimit, activeTaskRuns, }; },