Skip to content

Commit

Permalink
breaking down api calls to seperate functions
Browse files Browse the repository at this point in the history
  • Loading branch information
devinvillarosa committed Jan 2, 2025
1 parent 7b1e792 commit 3e631fe
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { RunCard } from "@/components/ui/run-card";
type Props = {
data: Array<{
taskRun: components["schemas"]["TaskRun"];
flowRun?: components["schemas"]["FlowRun"];
flow?: components["schemas"]["Flow"];
flowRun?: components["schemas"]["FlowRun"] | null;
flow?: components["schemas"]["Flow"] | null;
}>;
};

Expand Down
10 changes: 5 additions & 5 deletions ui-v2/src/components/ui/run-card/run-card.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const getValues = ({
flowRun,
taskRun,
}: {
flowRun: undefined | components["schemas"]["FlowRun"];
taskRun: undefined | components["schemas"]["TaskRun"];
flowRun: null | undefined | components["schemas"]["FlowRun"];
taskRun: null | undefined | components["schemas"]["TaskRun"];
}) => {
if (taskRun) {
const { state, start_time, tags, estimated_run_time } = taskRun;
Expand All @@ -34,10 +34,10 @@ const getValues = ({
};

type Props = {
flow?: components["schemas"]["Flow"];
flowRun?: components["schemas"]["FlowRun"];
flow?: components["schemas"]["Flow"] | null;
flowRun?: components["schemas"]["FlowRun"] | null;
/** If task run is included, uses fields from task run over flow run */
taskRun?: components["schemas"]["TaskRun"];
taskRun?: components["schemas"]["TaskRun"] | null;
};

export const RunCard = ({ flow, flowRun, taskRun }: Props) => {
Expand Down
193 changes: 111 additions & 82 deletions ui-v2/src/hooks/task-run-concurrency-limits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,110 @@ export const useResetTaskRunConcurrencyLimitTag = () => {
};
};

const fetchTaskRunConcurrencyLimit = async (id: string) => {
// GET task-run-concurrency-limit by id
const res = await getQueryService().GET("/concurrency_limits/{id}", {
params: { path: { id } },
});
if (!res.data) {
throw new Error("'data' expected");
}
return res.data;
};

const fetchActiveTaskRunDetails = async (activeSlots: Array<string>) => {
const taskRuns = await getQueryService().POST("/task_runs/filter", {
body: {
task_runs: {
id: { any_: activeSlots },
operator: "or_",
},
sort: "NAME_DESC",
offset: 0,
},
});
if (!taskRuns.data) {
throw new Error("'data' expected");
}
const taskRunsWithFlows: Array<components["schemas"]["TaskRun"]> = [];
const taskRunsOnly: Array<components["schemas"]["TaskRun"]> = [];

taskRuns.data.forEach((taskRun) => {
if (taskRun.flow_run_id) {
taskRunsWithFlows.push(taskRun);
} else {
taskRunsOnly.push(taskRun);
}
});

const flowRunsIds = taskRunsWithFlows.map(
(taskRun) => taskRun.flow_run_id as string,
);

// Get Flow Runs info
const flowRuns = await getQueryService().POST("/flow_runs/filter", {
body: {
flow_runs: {
id: { any_: flowRunsIds },
operator: "or_",
},
sort: "NAME_DESC",
offset: 0,
},
});
if (!flowRuns.data) {
throw new Error("'data' expected");
}
const hasSameFlowID = flowRuns.data.every(
(flowRun) => flowRun.flow_id === flowRuns.data[0].flow_id,
);
if (!hasSameFlowID) {
throw new Error("Flow runs has mismatching 'flow_id'");
}
const flowID = flowRuns.data[0].flow_id;

// Get Flow info
const flow = await getQueryService().GET("/flows/{id}", {
params: { path: { id: flowID } },
});

if (!flow.data) {
throw new Error("'data' expected");
}

// Normalize data per active slot :
/**
*
* -> active_slot (task_run_id 1) -> flow_run (flow_run_id 1)
* concurrencyLimit -> active_slot (task_run_id 2) -> flow_run (flow_run_id 2) -> flow (flow_id)
* -> active_slot (task_run_id 3) -> flow_run (flow_run_id 3)
*
*/
const activeTaskRunsWithFlows = taskRunsWithFlows.map((taskRunsWithFlow) => {
const flowRun = flowRuns.data.find(
(flowRun) => flowRun.id === taskRunsWithFlow.flow_run_id,
);

if (!flowRun) {
throw new Error('"Expected to find flowRun');
}

return {
taskRun: taskRunsWithFlow,
flowRun,
flow: flow.data,
};
});

const activeTaskRunsWithoutFlows = taskRunsOnly.map((taskRun) => ({
taskRun,
flowRun: null,
flow: null,
}));

return [...activeTaskRunsWithFlows, ...activeTaskRunsWithoutFlows];
};

/**
*
* @param id
Expand All @@ -221,98 +325,23 @@ export const buildConcurrenyLimitDetailsActiveRunsQuery = (id: string) =>
queryOptions({
queryKey: queryKeyFactory.activeTaskRun(id),
queryFn: async () => {
// GET task-run-concurrency-limit by id
const taskRunConcurrencyLimit = await getQueryService().GET(
"/concurrency_limits/{id}",
{ params: { path: { id } } },
);
if (!taskRunConcurrencyLimit.data?.active_slots) {
const taskRunConcurrencyLimit = await fetchTaskRunConcurrencyLimit(id);
if (!taskRunConcurrencyLimit.active_slots) {
throw new Error("'active_slots' expected");
}
// Early exit because there are no active task runs
if (taskRunConcurrencyLimit.data.active_slots.length === 0) {
if (taskRunConcurrencyLimit.active_slots.length === 0) {
return {
taskRunConcurrencyLimit: taskRunConcurrencyLimit.data,
taskRunConcurrencyLimit: taskRunConcurrencyLimit,
activeTaskRuns: [],
};
}

// Get Task Runs info
const taskRuns = await getQueryService().POST("/task_runs/filter", {
body: {
task_runs: {
id: { any_: taskRunConcurrencyLimit.data.active_slots },
operator: "or_",
},
sort: "NAME_DESC",
offset: 0,
},
});
if (!taskRuns.data) {
throw new Error("'data' expected");
}

const flowRunsIds = taskRuns.data
.filter((taskRun) => Boolean(taskRun.flow_run_id))
.map((taskRun) => taskRun.flow_run_id) as Array<string>;

// Get Flow Runs info
const flowRuns = await getQueryService().POST("/flow_runs/filter", {
body: {
flow_runs: {
id: { any_: flowRunsIds },
operator: "or_",
},
sort: "NAME_DESC",
offset: 0,
},
});
if (!flowRuns.data) {
throw new Error("'data' expected");
}
const hasSameFlowID = flowRuns.data.every(
(flowRun) => flowRun.flow_id === flowRuns.data[0].flow_id,
const activeTaskRuns = await fetchActiveTaskRunDetails(
taskRunConcurrencyLimit.active_slots,
);
if (!hasSameFlowID) {
throw new Error("Flow runs has mismatching 'flow_id'");
}
const flowID = flowRuns.data[0].flow_id;

// Get Flow info
const flow = await getQueryService().GET("/flows/{id}", {
params: { path: { id: flowID } },
});

if (!flow.data) {
throw new Error("'data' expected");
}

// Normalize data per active slot :
/**
*
* -> active_slot (task_run_id 1) -> flow_run (flow_run_id 1)
* concurrencyLimit -> active_slot (task_run_id 2) -> flow_run (flow_run_id 2) -> flow (flow_id)
* -> active_slot (task_run_id 3) -> flow_run (flow_run_id 3)
*
*/
const activeTaskRuns = taskRuns.data.map((taskRun) => {
const flowRun = flowRuns.data.find(
(flowRun) => flowRun.id === taskRun.flow_run_id,
);

if (!flowRun) {
throw new Error('"Expected to find flowRun');
}

return {
taskRun,
flowRun,
flow: flow.data,
};
});

return {
taskRunConcurrencyLimit: taskRunConcurrencyLimit.data,
taskRunConcurrencyLimit,
activeTaskRuns,
};
},
Expand Down

0 comments on commit 3e631fe

Please sign in to comment.