Skip to content

Commit

Permalink
P-Queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl Brugger committed Sep 5, 2023
1 parent 3f546ba commit 0f7b0c7
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 72 deletions.
85 changes: 32 additions & 53 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/record-hook/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"@flatfile/hooks": "^1.3.1",
"@flatfile/listener": "^0.3.15",
"@flatfile/util-common": "^0.1.1",
"effect": "^2.0.0-next.29"
"p-queue": "^7.4.1"
},
"devDependencies": {
"axios": "^1.4.0"
Expand Down
32 changes: 14 additions & 18 deletions plugins/record-hook/src/RecordHook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { FlatfileRecord, FlatfileRecords } from '@flatfile/hooks'
import { Record_, Records } from '@flatfile/api/api'
import { RecordTranslater } from './record.translater'
import { asyncBatch } from '@flatfile/util-common'
import { Effect } from 'effect'
import PQueue from 'p-queue'

export const RecordHook = async (
event: FlatfileEvent,
Expand All @@ -13,16 +13,12 @@ export const RecordHook = async (
) => any | Promise<any>,
options: { concurrency?: number } = {}
) => {
await BulkRecordHook(event, async (records, event) => {
await BulkRecordHook(event, async (records, bulkEvent) => {
const { concurrency } = { concurrency: 10, ...options }
const handlers = records.map((record: FlatfileRecord) =>
Effect.promise(async () => {
await handler(record, event)
})
)
Effect.runPromise(
Effect.all(handlers, {
concurrency,
const queue = new PQueue({ concurrency })
await Promise.all(
records.map(async (record) => {
queue.add(() => handler(record, bulkEvent))
})
)
})
Expand Down Expand Up @@ -54,14 +50,14 @@ export const BulkRecordHook = async (

await event.cache.set('records', async () => recordsUpdates)

event.afterAll(async () => {
try {
const records = event.cache.get<Records>('records')
await event.update(records)
} catch (e) {
console.log(`Error updating records: ${e}`)
}
})
// event.afterAll(async () => {
try {
const records = event.cache.get<Records>('records')
await event.update(records)
} catch (e) {
console.log(`Error updating records: ${e}`)
}
// })
} catch (e) {
console.log(`Error getting records: ${e}`)
}
Expand Down

0 comments on commit 0f7b0c7

Please sign in to comment.