Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
carlbrugger committed May 21, 2024
1 parent 8652661 commit af65b88
Show file tree
Hide file tree
Showing 14 changed files with 409 additions and 242 deletions.
139 changes: 70 additions & 69 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion plugins/webhook-egress/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
"@flatfile/plugin-job-handler": "^0.5.2",
"@flatfile/util-common": "^1.3.2",
"@flatfile/util-response-rejection": "^1.3.3",
"cross-fetch": "^4.0.0"
"cross-fetch": "^4.0.0",
"modern-async": "^2.0.0"
},
"peerDependencies": {
"@flatfile/api": "^1.8.9",
Expand Down
32 changes: 32 additions & 0 deletions plugins/webhook-egress/src/fetch.and.process.sheets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { Flatfile } from '@flatfile/api'
import api from '@flatfile/api'
import { asyncMap } from 'modern-async'

interface Part {
sheetId: string
pageNumber: number
pageSize: number
}

export async function prepareParts(
workbookId: string,
pageSize: number,
filter: Flatfile.Filter
): Promise<Array<Part>> {
const { data: sheets } = await api.sheets.list({ workbookId })

const partsArrays = await asyncMap(sheets, async (sheet) => {
const {
data: {
counts: { total },
},
} = await api.sheets.getRecordCounts(sheet.id, { filter })
return Array.from({ length: Math.ceil(total / pageSize) }, (_, index) => ({
sheetId: sheet.id,
pageNumber: index + 1,
pageSize,
}))
})

return partsArrays.flat()
}
35 changes: 35 additions & 0 deletions plugins/webhook-egress/src/post.to.webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { getSecret, isValidUrl } from '@flatfile/util-common'
import { SheetExport } from './types'

export async function postToWebhook(
sheetExport: SheetExport,
url: string | URL,
urlParams: Array<{ key: string; value: unknown }>,
secretName: string,
environmentId: string,
spaceId: string
) {
const baseUrl = isValidUrl(url)
? url
: await getSecret(url as string, environmentId)
const queryParams = new URLSearchParams()
urlParams.forEach(({ key, value }) => {
Array.isArray(value)
? value.forEach((v) => queryParams.append(key, String(v)))
: queryParams.set(key, String(value))
})
const secret = secretName
? await getSecret(secretName, environmentId, spaceId)
: ''
const response = await fetch(`${baseUrl}?${queryParams}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(secret ? { Authorization: `Bearer ${secret}` } : {}),
},
body: JSON.stringify({ sheet: sheetExport }),
})
if (!response.ok)
throw new Error(`HTTP ${response.status} ${response.statusText}`)
return response.json()
}
13 changes: 13 additions & 0 deletions plugins/webhook-egress/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { Flatfile } from '@flatfile/api'

export interface SheetExport extends Flatfile.Sheet {
records: Flatfile.Record_[]
}

export interface WebhookEgressOptions {
secretName?: string
urlParams?: Array<{ key: string; value: unknown }>
pageSize?: number
filter?: Flatfile.Filter
debug?: boolean
}
18 changes: 9 additions & 9 deletions plugins/webhook-egress/src/webhook.egress.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ fetchMock.dontMock()
describe('webhookEgress() e2e', () => {
const listener = setupListener()

let spaceId
let workbookId
let sheetId
let spaceId: string
let workbookId: string
let sheetId: string

beforeAll(async () => {
const space = await setupSpace()
Expand All @@ -30,7 +30,7 @@ describe('webhookEgress() e2e', () => {
'notes',
])
workbookId = workbook.id
sheetId = workbook.sheets[0].id
sheetId = workbook.sheets![0].id
await createRecords(sheetId, [
{
name: 'John Doe',
Expand Down Expand Up @@ -76,7 +76,7 @@ describe('webhookEgress() e2e', () => {
await listener.waitFor('job:ready', 1, 'workbook:egressTestSuccess')

const response = await api.jobs.get(successfulJobId)
expect(response.data.outcome.message).toEqual(
expect(response.data.outcome?.message).toEqual(
`Data was successfully submitted to the provided webhook. Go check it out at example.com.`
)
})
Expand Down Expand Up @@ -157,10 +157,10 @@ describe('webhookEgress() e2e', () => {
await listener.waitFor('job:ready', 1, 'workbook:egressTestSuccess')

const response = await api.jobs.get(successfulJobId)
expect(response.data.outcome.message).toEqual(
expect(response.data.outcome?.message).toEqual(
'The data has been successfully submitted without any rejections. This task is now complete.'
)
expect(response.data.outcome.heading).toEqual('Success!')
expect(response.data.outcome?.heading).toEqual('Success!')
})

it('returns rejections', async () => {
Expand Down Expand Up @@ -208,10 +208,10 @@ describe('webhookEgress() e2e', () => {
await listener.waitFor('job:ready', 1, 'workbook:egressTestSuccess')

const response = await api.jobs.get(successfulJobId)
expect(response.data.outcome.message).toEqual(
expect(response.data.outcome?.message).toEqual(
'During the data submission process, 1 records were rejected. Please review and correct these records before resubmitting.'
)
expect(response.data.outcome.heading).toEqual('Rejected Records')
expect(response.data.outcome?.heading).toEqual('Rejected Records')
})
})
})
165 changes: 104 additions & 61 deletions plugins/webhook-egress/src/webhook.egress.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,126 @@
import { FlatfileClient } from '@flatfile/api'
import { FlatfileListener } from '@flatfile/listener'
import { jobHandler } from '@flatfile/plugin-job-handler'
import { logError } from '@flatfile/util-common'
import {
RejectionResponse,
responseRejectionHandler,
} from '@flatfile/util-response-rejection'
import { Flatfile, FlatfileClient } from '@flatfile/api'
import type FlatfileListener from '@flatfile/listener'
import { FlatfileEvent } from '@flatfile/listener'
import { deleteRecords, getRecordsRaw, logError } from '@flatfile/util-common'
import { responseRejectionHandler } from '@flatfile/util-response-rejection'
import { prepareParts } from './fetch.and.process.sheets'
import { postToWebhook } from './post.to.webhook'
import type { SheetExport, WebhookEgressOptions } from './types'

const api = new FlatfileClient()

export function webhookEgress(job: string, webhookUrl?: string) {
return function (listener: FlatfileListener) {
listener.use(
jobHandler(job, async (event, tick) => {
const { workbookId } = event.context
const { data: workbook } = await api.workbooks.get(workbookId)
const { data: workbookSheets } = await api.sheets.list({ workbookId })
export function webhookEgress(
job: string,
url: string | URL,
options: WebhookEgressOptions = {}
) {
const {
secretName = 'WEBHOOK_TOKEN',
urlParams = [],
pageSize = 10_000,
filter = Flatfile.Filter.Valid,
debug = false,
} = options
let deleteSubmitted = false

await tick(30, 'Getting workbook data')
return (listener: FlatfileListener) => {
listener.on(
'job:ready',
{ job, isPart: false },
async (event: FlatfileEvent) => {
const { jobId, workbookId } = event.context

const sheets = []
for (const [_, element] of workbookSheets.entries()) {
const { data: records } = await api.records.get(element.id)
sheets.push({
...element,
...records,
await api.jobs.ack(jobId, { info: 'Splitting Job', progress: 10 })

const parts = await prepareParts(workbookId, pageSize, filter)
if (parts.length > 0) {
await api.jobs.split(jobId, { parts })
await api.jobs.ack(jobId, {
info: `Job Split into ${parts.length} parts.`,
progress: 20,
})
} else {
await api.jobs.complete(jobId, {
outcome: { message: 'nothing to do' },
})
}
}
)

await tick(60, 'Posting data to webhook')

listener.on(
'job:ready',
{ job, isPart: true },
async (event: FlatfileEvent) => {
const { jobId, environmentId, spaceId } = event.context
try {
const webhookReceiver = webhookUrl || process.env.WEBHOOK_SITE_URL
const response = await fetch(webhookReceiver, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
workbook: {
...workbook,
sheets,
},
}),
const job = await api.jobs.get(jobId)
const { sheetId, pageNumber } = job.data.partData!

const records = await getRecordsRaw(sheetId, {
pageNumber,
pageSize,
filter,
})

if (response.status === 200) {
const responseData = await response.json()
const rejections: RejectionResponse = responseData.rejections
if (records instanceof Error) {
throw new Error(`Error fetching records: ${records.message}`)
}

if (rejections) {
return await responseRejectionHandler(rejections)
}
const { data: sheet } = await api.sheets.get(sheetId)
const sheetExport: SheetExport = { ...sheet, records }
const responseData = await postToWebhook(
sheetExport,
url,
urlParams,
secretName,
environmentId,
spaceId
)

return {
outcome: {
message: `Data was successfully submitted to the provided webhook. Go check it out at ${webhookReceiver}.`,
},
}
const { rejections } = responseData
if (rejections) {
const response = await responseRejectionHandler(rejections)
deleteSubmitted = rejections.deleteSubmitted

await api.jobs.complete(jobId, response.jobCompleteDetails)
} else {
logError(
'@flatfile/plugin-webhook-egress',
`Failed to submit data to ${webhookReceiver}. Status: ${response.status} ${response.statusText}`
)
return {
await api.jobs.complete(jobId, {
outcome: {
message: `Data was not successfully submitted to the provided webhook. Status: ${response.status} ${response.statusText}`,
message: `Data was successfully submitted to the provided webhook. Check it out at ${url}.`,
},
}
})
}
} catch (error) {
logError(
'@flatfile/plugin-webhook-egress',
JSON.stringify(error, null, 2)
)
// Throw error to fail job
throw new Error(`Error posting data to webhook`)
if (debug) {
logError('@flatfile/plugin-webhook-egress', error.message)
}
await api.jobs.fail(jobId, { outcome: { message: error.message } })
}
})
}
)

listener.on(
'job:parts-completed',
{ job, isPart: false },
async (event: FlatfileEvent) => {
const { jobId, workbookId } = event.context
if (deleteSubmitted) {
const { data: sheets } = await api.sheets.list({ workbookId })
for (const sheet of sheets) {
const {
data: {
counts: { valid },
},
} = await api.sheets.getRecordCounts(sheet.id)
if (valid > 0) await deleteRecords(sheet.id, { filter: 'valid' })
}
}
await api.jobs.complete(jobId, {
outcome: { message: 'This job is now complete.' },
})
}
)
}
}

export * from './types'
15 changes: 15 additions & 0 deletions utils/common/src/get.secret.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import api from '@flatfile/api'
import { handleError } from './logging.helper'

export async function getSecret(
name: string,
environmentId: string,
spaceId?: string
): Promise<string | undefined> {
try {
const secrets = await api.secrets.list({ spaceId, environmentId })
return secrets.data.find((secret) => secret.name === name)?.value
} catch (e) {
handleError(e, `Error fetching secret ${name}`)
}
}
2 changes: 2 additions & 0 deletions utils/common/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export * from './all.records'
export * from './async.batch'
export * from './delete.records'
export * from './get.secret'
export * from './logging.helper'
export * from './slugify'
export * from './valid.url'
5 changes: 5 additions & 0 deletions utils/common/src/logging.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ export const logWarn = (packageName: string, msg: string): void => {
export const logError = (packageName: string, msg: string): void => {
log(packageName, msg, 'error')
}

export function handleError(error: any, message: string) {
console.error(error)
throw new Error(`${message}: ${error.message}`)
}
11 changes: 11 additions & 0 deletions utils/common/src/valid.url.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export function isValidUrl(url: string | URL) {
if (url instanceof URL) {
return true
}
try {
new URL(url)
return true
} catch (error) {
return false
}
}
Loading

0 comments on commit af65b88

Please sign in to comment.