Skip to content

Commit

Permalink
fix asyncBatching, add debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl Brugger committed Sep 8, 2023
1 parent 0c72186 commit 7a0b7b3
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 55 deletions.
1 change: 1 addition & 0 deletions plugins/autocast/src/autocast.plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export function autocast(
options?: {
chunkSize?: number
parallel?: number
debug?: boolean
}
) {
return async (listener: FlatfileListener) => {
Expand Down
1 change: 1 addition & 0 deletions plugins/delimiter-extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ interface DelimiterOptions {
transform?: (value: any) => Flatfile.CellValueUnion
chunkSize?: number
parallel?: number
debug?: boolean
}

export const DelimiterExtractor = (
Expand Down
1 change: 1 addition & 0 deletions plugins/json-extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { parseBuffer } from './parser'
export const JSONExtractor = (options?: {
chunkSize?: number
parallel?: number
debug?: boolean
}) => {
return Extractor('.json', 'json', parseBuffer, options)
}
Expand Down
1 change: 1 addition & 0 deletions plugins/psv-extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const PSVExtractor = (options?: {
transform?: (value: any) => Flatfile.CellValueUnion
chunkSize?: number
parallel?: number
debug?: boolean
}) => {
return DelimiterExtractor('.psv', { delimiter: '|', ...options })
}
Expand Down
24 changes: 16 additions & 8 deletions plugins/record-hook/src/RecordHook.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import { FlatfileEvent } from '@flatfile/listener'
import { FlatfileRecord, FlatfileRecords } from '@flatfile/hooks'
import { Record_, Records } from '@flatfile/api/api'
import { RecordTranslater } from './record.translater'
import { FlatfileRecord, FlatfileRecords } from '@flatfile/hooks'
import { FlatfileEvent } from '@flatfile/listener'
import { asyncBatch } from '@flatfile/util-common'
import { RecordTranslater } from './record.translater'

export const RecordHook = async (
event: FlatfileEvent,
handler: (record: FlatfileRecord, event?: FlatfileEvent) => any | Promise<any>
handler: (
record: FlatfileRecord,
event?: FlatfileEvent
) => any | Promise<any>,
options: { debug?: boolean } = {}
) => {
return BulkRecordHook(event, async (records, event) => {
return records.map((record) => handler(record, event))
})
return BulkRecordHook(
event,
async (records, event) => {
return records.map((record) => handler(record, event))
},
options
)
}

export const BulkRecordHook = async (
Expand All @@ -19,7 +27,7 @@ export const BulkRecordHook = async (
records: FlatfileRecord[],
event?: FlatfileEvent
) => any | Promise<any>,
options: { chunkSize?: number; parallel?: number } = {}
options: { chunkSize?: number; parallel?: number; debug?: boolean } = {}
) => {
try {
const records = await event.cache.init<Records>(
Expand Down
13 changes: 7 additions & 6 deletions plugins/record-hook/src/record.hook.plugin.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { FlatfileListener, FlatfileEvent } from '@flatfile/listener'
import { BulkRecordHook, RecordHook } from './RecordHook'
import type { FlatfileRecord } from '@flatfile/hooks'
import { FlatfileEvent, FlatfileListener } from '@flatfile/listener'
import { BulkRecordHook, RecordHook } from './RecordHook'

export const recordHookPlugin = (
sheetSlug: string,
callback: (
record: FlatfileRecord,
event?: FlatfileEvent
) => any | Promise<any>
) => any | Promise<any>,
options: { debug?: boolean } = {}
) => {
return (client: FlatfileListener) => {
client.on('commit:created', { sheetSlug }, (event: FlatfileEvent) => {
return RecordHook(event, callback)
return RecordHook(event, callback, options)
})
}
}
Expand All @@ -22,7 +23,7 @@ export const bulkRecordHookPlugin = (
records: FlatfileRecord[],
event?: FlatfileEvent
) => any | Promise<any>,
options: { chunkSize?: number; parallel?: number } = {}
options: { chunkSize?: number; parallel?: number; debug?: boolean } = {}
) => {
return (client: FlatfileListener) => {
client.on('commit:created', { sheetSlug }, (event: FlatfileEvent) => {
Expand All @@ -32,6 +33,6 @@ export const bulkRecordHookPlugin = (
}

export {
recordHookPlugin as recordHook,
bulkRecordHookPlugin as bulkRecordHook,
recordHookPlugin as recordHook,
}
1 change: 1 addition & 0 deletions plugins/tsv-extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const TSVExtractor = (options?: {
transform?: (value: any) => Flatfile.CellValueUnion
chunkSize?: number
parallel?: number
debug?: boolean
}) => {
return DelimiterExtractor('.tsv', { delimiter: '\t', ...options })
}
1 change: 1 addition & 0 deletions plugins/xlsx-extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const ExcelExtractor = (options?: {
rawNumbers?: boolean
chunkSize?: number
parallel?: number
debug?: boolean
}) => {
return Extractor(
/\.(xlsx?|xlsm|xlsb|xltx?|xltm)$/i,
Expand Down
1 change: 1 addition & 0 deletions plugins/xml-extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const XMLExtractor = (options?: {
transform?: (row: Record<string, any>) => Record<string, any>
chunkSize?: number
parallel?: number
debug?: boolean
}) => {
return Extractor('.xml', 'xml', parseBuffer, options)
}
71 changes: 32 additions & 39 deletions utils/common/src/async.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,50 @@ import { FlatfileEvent } from '@flatfile/listener'
export async function asyncBatch<T, R>(
arr: T[],
callback: (chunk: T[], event?: FlatfileEvent) => Promise<R>,
options: { chunkSize?: number; parallel?: number } = {},
options: { chunkSize?: number; parallel?: number; debug?: boolean } = {},
event?: FlatfileEvent
): Promise<R[]> {
const { chunkSize, parallel } = { chunkSize: 3000, parallel: 1, ...options }
const results: R[] = []

// Split the array into chunks
const chunks: T[][] = []
for (let i = 0; i < arr.length; i += chunkSize) {
chunks.push(arr.slice(i, i + chunkSize))
const { chunkSize = 3000, parallel = 1, debug = false } = options
const chunks = Array.from(
{ length: Math.ceil(arr.length / chunkSize) },
(_, i) => arr.slice(i * chunkSize, i * chunkSize + chunkSize)
)

if (debug) {
console.log(`${chunks.length} chunks to be processed`)
}

// Create a helper function to process a chunk
async function processChunk(chunk: T[]): Promise<void> {
const result = await callback(chunk, event)
results.push(result)
}
const results: Map<number, R> = new Map()

// Execute the chunks in parallel
const promises: Promise<void>[] = []
let running = 0
let currentIndex = 0

function processNext(): void {
if (currentIndex >= chunks.length) {
// All chunks have been processed
return
async function processChunk(
chunkIndex: number,
threadId: number
): Promise<void> {
if (debug) {
console.log(`Thread ${threadId} processing chunk ${chunkIndex}`)
}

const currentChunk = chunks[currentIndex]
const promise = processChunk(currentChunk).finally(() => {
running--
processNext() // Process next chunk
})

promises.push(promise)
currentIndex++
running++
const result = await callback(chunks[chunkIndex], event)
results.set(chunkIndex, result)
}

if (running < parallel) {
processNext() // Process another chunk if available
let currentIndex = 0
async function processChunks(threadId: number): Promise<void> {
while (currentIndex < chunks.length) {
const chunkIndex = currentIndex++
await processChunk(chunkIndex, threadId)
}
}

// Start processing the chunks
for (let i = 0; i < parallel && i < chunks.length; i++) {
processNext()
}
const promises: Promise<void>[] = Array.from({ length: parallel }, (_, i) =>
processChunks(i)
)

// Wait for all promises to resolve
await Promise.all(promises)

return results
if (debug) {
console.log('All chunks processed')
}

return Array.from(results.values())
}
5 changes: 3 additions & 2 deletions utils/extractor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ export const Extractor = (
progress: 50,
info: 'Adding records to Sheets',
})
const { chunkSize, parallel } = {
const { chunkSize, parallel, debug } = {
chunkSize: 3000,
parallel: 1,
debug: false,
...options,
}
for (const sheet of workbook.sheets) {
Expand All @@ -73,7 +74,7 @@ export const Extractor = (
async (chunk) => {
await api.records.insert(sheet.id, chunk)
},
{ chunkSize, parallel }
{ chunkSize, parallel, debug }
)
}
await api.files.update(file.id, {
Expand Down

0 comments on commit 7a0b7b3

Please sign in to comment.