diff --git a/packages/client/src/components/AdminPage.tsx b/packages/client/src/components/AdminPage.tsx index b6dd41e..16f5c19 100644 --- a/packages/client/src/components/AdminPage.tsx +++ b/packages/client/src/components/AdminPage.tsx @@ -42,37 +42,42 @@ export const AdminPage = () => { return
Loading...
; } return ( - - - - - - Username - - - Created At - - - Tasks - - - Actions - - - - - {sortBy(pipelines, (x) => dayjs(x.createdAt).valueOf()).map( - (pipeline) => ( - - ) - )} - -
-
+ <> + + + + + + + Username + + + Created At + + + Tasks + + + Actions + + + + + {sortBy(pipelines, (x) => dayjs(x.createdAt).valueOf()).map( + (pipeline) => ( + + ) + )} + +
+
+ ); }; diff --git a/packages/server/src/lib/getNumRunningPipelines.ts b/packages/server/src/lib/getNumRunningPipelines.ts index 5894b41..c8c282f 100644 --- a/packages/server/src/lib/getNumRunningPipelines.ts +++ b/packages/server/src/lib/getNumRunningPipelines.ts @@ -1,6 +1,7 @@ import { UserModel } from "shared/src/schemas"; import { z } from "zod"; import { prisma } from "../db"; +import { sortBy } from "remeda"; export async function getNumRunningPipelines( authenticatedUser: z.infer @@ -20,7 +21,11 @@ export async function getNumRunningPipelines( ) // TODO: check // slice(1) because the first task is the pipeline itself, doesn't get updated properly - .filter((p) => p.tasks.slice(1).some((t) => t.status === "running")); + .filter((p) => + sortBy(p.tasks, (x) => x.createdAt.valueOf()) + .slice(1) + .some((t) => t.status === "running") + ); return pipelines.length; } diff --git a/packages/server/src/lib/jobsPrisma.ts b/packages/server/src/lib/jobsPrisma.ts index 9f65b21..9960d18 100644 --- a/packages/server/src/lib/jobsPrisma.ts +++ b/packages/server/src/lib/jobsPrisma.ts @@ -33,6 +33,7 @@ interface Job { created_at: Date; updated_at: Date; key: string | null; + locked_by: string | null; } export async function getGraphileJobs(): Promise { @@ -40,3 +41,10 @@ export async function getGraphileJobs(): Promise { Prisma.sql`SELECT * FROM graphile_worker.jobs` ); } + +export async function getRunningWorkerIds(): Promise { + // unique list of locked_by ids + return await prisma.$queryRaw( + Prisma.sql`SELECT DISTINCT locked_by FROM graphile_worker.jobs` + ); +} diff --git a/packages/server/src/routers/adminRouter.ts b/packages/server/src/routers/adminRouter.ts index 816aeb8..7f26680 100644 --- a/packages/server/src/routers/adminRouter.ts +++ b/packages/server/src/routers/adminRouter.ts @@ -12,6 +12,7 @@ import { getGraphileJobs, getPipelineJobByKey, getPipelineTaskJobById, + getRunningWorkerIds, } from "../lib/jobsPrisma"; import { indexBy, omit } from "remeda"; @@ -206,4 +207,11 @@ export const adminRouter = router({ }, }); }), + + unlockWorkers: publicProcedure.mutation(async () => { + const utils = await workerUtils(); + const workerIds = await getRunningWorkerIds(); + await utils.forceUnlockWorkers(workerIds); + console.log("attempted to unlock workers"); + }), }); diff --git a/packages/server/src/tasks/worker.ts b/packages/server/src/tasks/worker.ts index 686c9d3..c6f1971 100644 --- a/packages/server/src/tasks/worker.ts +++ b/packages/server/src/tasks/worker.ts @@ -1,4 +1,10 @@ -import { Logger, TaskList, parseCronItem, run } from "graphile-worker"; +import { + JobHelpers, + Logger, + TaskList, + parseCronItem, + run, +} from "graphile-worker"; import dotenv from "dotenv"; import { workerUtils } from "./workerUtils"; import { prisma } from "../db"; @@ -18,7 +24,7 @@ dotenv.config(); // if they have < max pipelines, add a pipeline to get recommendations export const addPipelineCronJob = { id: "recurring-add-pipeline-cron", - handler: async () => { + handler: async (_: any, helpers: JobHelpers) => { const users = shuffle(await prisma.user.findMany()); for (const user of users) { const numActivePipelines = await getNumRunningPipelines(user); @@ -31,6 +37,14 @@ export const addPipelineCronJob = { runId: uuidv4(), emailResults: true, }); + } else { + helpers.logger.info( + "Skipping user " + + user.username + + " as they have " + + numActivePipelines + + " active pipelines" + ); } } }, @@ -100,6 +114,7 @@ export async function startWorker() { taskList: taskList as TaskList, connectionString: process.env.DATABASE_URL, }); + const getTaskOrPipeline = async (jobOrKeyId: string) => { const pipeline = await prisma.pipelineRun.findFirst({ where: { jobKeyId: jobOrKeyId },