Skip to content

Commit

Permalink
Merge pull request #390 from Klimatbyran/feat/divideSaveToApi
Browse files Browse the repository at this point in the history
Feat: divide save to api to separate workers
  • Loading branch information
Greenheart authored Dec 5, 2024
2 parents 814d2f3 + 26f9c10 commit d260479
Show file tree
Hide file tree
Showing 27 changed files with 710 additions and 560 deletions.
2 changes: 1 addition & 1 deletion k8s/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
app: garbo
spec:
containers:
- image: ghcr.io/klimatbyran/garbo:3.4.9 # {"$imagepolicy": "flux-system:garbo"}
- image: ghcr.io/klimatbyran/garbo:3.4.13 # {"$imagepolicy": "flux-system:garbo"}
resources: {}
name: garbo
ports:
Expand Down
4 changes: 2 additions & 2 deletions k8s/nlm-ingestor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Deployment
metadata:
name: nlm
spec:
replicas: 1
replicas: 5
selector:
matchLabels:
app: nlm
Expand All @@ -16,7 +16,7 @@ spec:
- image: ghcr.io/nlmatics/nlm-ingestor
resources:
limits:
cpu: 3000m
cpu: 1100m
memory: 2Gi
name: nlm
ports:
Expand Down
2 changes: 1 addition & 1 deletion k8s/pre-deploy/db-migrate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
restartPolicy: Never
containers:
- name: migration
image: ghcr.io/klimatbyran/garbo:3.4.9 # {"$imagepolicy": "flux-system:garbo"}
image: ghcr.io/klimatbyran/garbo:3.4.13 # {"$imagepolicy": "flux-system:garbo"}
command: ['npm', 'run', 'migrate']
env:
- name: POSTGRES_PASSWORD
Expand Down
2 changes: 1 addition & 1 deletion k8s/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
app: worker
spec:
containers:
- image: ghcr.io/klimatbyran/garbo:3.4.9 # {"$imagepolicy": "flux-system:garbo"}
- image: ghcr.io/klimatbyran/garbo:3.4.13 # {"$imagepolicy": "flux-system:garbo"}
resources: {}
command: ['npm', 'run', 'workers']
name: worker
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "garbo",
"version": "3.4.9",
"version": "3.4.13",
"description": "",
"type": "module",
"engines": {
Expand Down
2 changes: 1 addition & 1 deletion src/discord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import commands from './discord/commands'
import config from './config/discord'
import approve from './discord/interactions/approve'
import reject from './discord/interactions/reject'
import saveToAPI, { JobData as SaveToApiJob } from './workers/saveToAPI'
import saveToAPI, { SaveToApiJob } from './workers/saveToAPI'

const getJob = (jobId: string) => saveToAPI.queue.getJob(jobId)

Expand Down
7 changes: 4 additions & 3 deletions src/discord/interactions/approve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { DiscordJob } from '../../lib/DiscordWorker'
export default {
async execute(interaction: ButtonInteraction, job: DiscordJob) {
await job.updateData({ ...job.data, approved: true })
await job.promote()

job.log(`Approving company edit: ${job.data.wikidataId}`)
interaction.reply({
await interaction.reply({
content: `Tack för din granskning, ${interaction?.user?.username}!`,
})
//discord.lockThread(threadId)

await job.promote()
},
}
3 changes: 1 addition & 2 deletions src/lib/DiscordWorker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Worker, WorkerOptions, Job, Queue, Processor } from 'bullmq'
import { Worker, WorkerOptions, Job, Queue } from 'bullmq'
import { Message, TextChannel } from 'discord.js'
import redis from '../config/redis'
import discord from '../discord'
Expand All @@ -8,7 +8,6 @@ export class DiscordJob extends Job {
url: string
threadId: string
channelId: string
wikidataId?: string
messageId?: string
}

Expand Down
52 changes: 51 additions & 1 deletion src/lib/prisma.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ export async function updateIndustry(
})
}

export async function ensureReportingPeriodExists(
export async function upsertReportingPeriod(
company: Company,
metadata: Parameters<typeof prisma.metadata.create>[0]['data'],
{
Expand Down Expand Up @@ -590,3 +590,53 @@ export async function ensureReportingPeriodExists(
},
})
}

export async function upsertEmissions({
emissionsId,
year,
companyId,
}: {
emissionsId: number
year: string
companyId: string
}) {
return prisma.emissions.upsert({
where: { id: emissionsId },
update: {},
create: {
reportingPeriod: {
connect: {
reportingPeriodId: {
year,
companyId,
},
},
},
},
})
}

export async function upsertEconomy({
economyId,
companyId,
year,
}: {
economyId: number
companyId: string
year: string
}) {
return prisma.economy.upsert({
where: { id: economyId },
update: {},
create: {
reportingPeriod: {
connect: {
reportingPeriodId: {
year,
companyId,
},
},
},
},
})
}
72 changes: 72 additions & 0 deletions src/lib/saveUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { getReportingPeriodDates } from './reportingPeriodDates'

export function formatAsReportingPeriods(
entries: { year: number }[],
fiscalYear: { startMonth: number; endMonth: number },
category: 'emissions' | 'economy'
) {
return entries.map(({ year, ...data }) => {
const [startDate, endDate] = getReportingPeriodDates(
year,
fiscalYear.startMonth,
fiscalYear.endMonth
)
return {
startDate,
endDate,
[category]:
category === 'economy'
? (data as { economy: any }).economy
: {
...data,
},
}
})
}

import { askPrompt } from './openai'

export const defaultMetadata = (url: string) => ({
source: url,
comment: 'Parsed by Garbo AI',
})

/**
* Recusrively remove the provided keys from all levels of the object.
*
* Handles circular references.
* Source: https://stackoverflow.com/a/72493889
*/
const recursiveOmit = <T extends Object>(
obj: T,
keys: Set<string>,
visitedIn?: Set<any>
): T => {
if (obj == null || typeof obj !== 'object') return obj
const visited = visitedIn ?? new Set()
visited.add(obj)
Object.entries(obj).forEach(([key, val]) => {
if (keys.has(key)) {
delete obj[key as keyof T]
} else if (typeof val === 'object' && !visited.has(val)) {
recursiveOmit(val, keys, visited)
}
})
return obj
}

export const askDiff = async (before: any, after: any) => {
if (!before || !after) return 'NO_CHANGES'
return await askPrompt(
`What is changed between these two json values? Please respond in clear text with markdown formatting.
The purpose is to let an editor approve the changes or suggest changes in Discord.
Be as brief as possible. Never be technical - meaning no comments about structure changes, fields renames etc.
Focus only on the actual values that have changed.
When handling years and ambiguous dates, always use the last year in the period (e.g. startDate: 2020 - endDate: 2021 should be referred to as 2021).
NEVER REPEAT UNCHANGED VALUES OR UNCHANGED YEARS! If nothing important has changed, just write "NO_CHANGES".`,
JSON.stringify({
before: recursiveOmit(structuredClone(before), new Set(['metadata'])),
after: recursiveOmit(structuredClone(after), new Set(['metadata'])),
})
)
}
57 changes: 21 additions & 36 deletions src/routes/middlewares.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import { validateRequest, validateRequestBody } from './zod-middleware'
import { z, ZodError } from 'zod'
import cors, { CorsOptionsDelegate } from 'cors'

import { ensureReportingPeriodExists } from '../lib/prisma'
import {
upsertEconomy,
upsertEmissions,
upsertReportingPeriod,
} from '../lib/prisma'
import { GarboAPIError } from '../lib/garbo-api-error'
import apiConfig from '../config/api'

Expand Down Expand Up @@ -164,11 +168,12 @@ export const reportingPeriod =
if (req.method === 'POST' || req.method === 'PATCH') {
// TODO: Only allow creating a reporting period when updating other data
// TODO: Maybe throw 404 if the reporting period was not found and it is a GET request
const reportingPeriod = await ensureReportingPeriodExists(
company,
metadata,
{ startDate, endDate, reportURL, year }
)
const reportingPeriod = await upsertReportingPeriod(company, metadata, {
startDate,
endDate,
reportURL,
year,
})

res.locals.reportingPeriod = reportingPeriod
}
Expand All @@ -180,21 +185,11 @@ export const ensureEmissionsExists =
(prisma: PrismaClient) =>
async (req: Request, res: Response, next: NextFunction) => {
const reportingPeriod = res.locals.reportingPeriod
const emissionsId = res.locals.reportingPeriod.emissionsId ?? 0

const emissions = await prisma.emissions.upsert({
where: { id: emissionsId ?? 0 },
update: {},
create: {
reportingPeriod: {
connect: {
reportingPeriodId: {
year: reportingPeriod.year,
companyId: reportingPeriod.companyId,
},
},
},
},

const emissions = await upsertEmissions({
emissionsId: reportingPeriod.emissionsId ?? 0,
companyId: res.locals.company.wikidataId,
year: reportingPeriod.year,
})

res.locals.emissions = emissions
Expand All @@ -205,21 +200,11 @@ export const ensureEconomyExists =
(prisma: PrismaClient) =>
async (req: Request, res: Response, next: NextFunction) => {
const reportingPeriod = res.locals.reportingPeriod
const economyId = res.locals.reportingPeriod.economyId ?? 0

const economy = await prisma.economy.upsert({
where: { id: economyId },
update: {},
create: {
reportingPeriod: {
connect: {
reportingPeriodId: {
year: reportingPeriod.year,
companyId: reportingPeriod.companyId,
},
},
},
},

const economy = await upsertEconomy({
economyId: reportingPeriod.economyId ?? 0,
companyId: reportingPeriod.companyId,
year: reportingPeriod.year,
})

res.locals.economy = economy
Expand Down
Loading

0 comments on commit d260479

Please sign in to comment.