From 95355658ed8a0ad189c960bfc61acf78444a729e Mon Sep 17 00:00:00 2001 From: Carl Brugger Date: Mon, 11 Mar 2024 09:38:40 -0500 Subject: [PATCH] feat: Merge plugin updates (#258) * feat: add strict typing * feat: bulk delete records * feat: refactor to use openapi plugin and improve readability * feat: dynamically retrieve category * Update package-lock.json * chore: update comments * feat: camelcase field keys * chore: add comment * chore: update package.json * chore: update deps * chore: update version * Update package-lock.json * chore: update package.json * Create nine-rats-relate.md --- .changeset/nine-rats-relate.md | 5 + package-lock.json | 41 +- plugins/merge-connection/package.json | 5 +- plugins/merge-connection/src/config.ts | 102 ++++ .../merge-connection/src/create.workbook.ts | 193 +++++++ plugins/merge-connection/src/index.ts | 543 +----------------- plugins/merge-connection/src/merge.plugin.ts | 25 + .../src/poll.for.merge.sync.ts | 44 ++ plugins/merge-connection/src/sync.data.ts | 52 ++ .../merge-connection/src/sync.status.check.ts | 27 + .../merge-connection/src/sync.status.fetch.ts | 19 + plugins/merge-connection/src/sync.workbook.ts | 72 +++ plugins/merge-connection/src/utils.ts | 52 ++ 13 files changed, 614 insertions(+), 566 deletions(-) create mode 100644 .changeset/nine-rats-relate.md create mode 100644 plugins/merge-connection/src/config.ts create mode 100644 plugins/merge-connection/src/create.workbook.ts create mode 100644 plugins/merge-connection/src/merge.plugin.ts create mode 100644 plugins/merge-connection/src/poll.for.merge.sync.ts create mode 100644 plugins/merge-connection/src/sync.data.ts create mode 100644 plugins/merge-connection/src/sync.status.check.ts create mode 100644 plugins/merge-connection/src/sync.status.fetch.ts create mode 100644 plugins/merge-connection/src/sync.workbook.ts create mode 100644 plugins/merge-connection/src/utils.ts diff --git a/.changeset/nine-rats-relate.md b/.changeset/nine-rats-relate.md new file mode 100644 index 000000000..1c291f5b2 --- /dev/null +++ b/.changeset/nine-rats-relate.md @@ -0,0 +1,5 @@ +--- +'@flatfile/plugin-connect-via-merge': minor +--- + +This release provides support for additional Merge.dev integrations. diff --git a/package-lock.json b/package-lock.json index a5ad65c15..e42743d21 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2494,6 +2494,18 @@ "node": ">=6 <7 || >=8" } }, + "node_modules/@mergeapi/merge-node-client": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@mergeapi/merge-node-client/-/merge-node-client-1.0.5.tgz", + "integrity": "sha512-cDjdG0+waWzhx19aS/jnS4+PtzY7OznVvpWsikYtA9cfn0ZnCn5oVSWs62VjOvXs1pYWEh11tNQ6E16s2k1eWA==", + "dependencies": { + "form-data": "4.0.0", + "js-base64": "3.7.2", + "node-fetch": "2.7.0", + "qs": "6.11.2", + "url-join": "4.0.1" + } + }, "node_modules/@mischnic/json-sourcemap": { "version": "0.1.1", "dev": true, @@ -4946,10 +4958,6 @@ "version": "2.0.3", "license": "MIT" }, - "node_modules/@types/url-join": { - "version": "4.0.1", - "license": "MIT" - }, "node_modules/@types/yargs": { "version": "17.0.32", "license": "MIT", @@ -4961,10 +4969,6 @@ "version": "21.0.3", "license": "MIT" }, - "node_modules/@ungap/url-search-params": { - "version": "0.2.2", - "license": "ISC" - }, "node_modules/@vercel/ncc": { "version": "0.36.1", "resolved": "https://registry.npmjs.org/@vercel/ncc/-/ncc-0.36.1.tgz", @@ -14307,6 +14311,7 @@ "https://github.com/sponsors/broofa", "https://github.com/sponsors/ctavan" ], + "license": "MIT", "bin": { "uuid": "dist/bin/uuid" } @@ -14790,7 +14795,7 @@ }, "plugins/foreign-db-extractor": { "name": "@flatfile/plugin-foreign-db-extractor", - "version": "0.0.0", + "version": "0.0.1", "license": "ISC", "dependencies": { "@flatfile/api": "^1.7.1", @@ -14891,27 +14896,19 @@ "dependencies": { "@flatfile/api": "^1.7.4", "@flatfile/listener": "^1.0.1", + "@flatfile/plugin-convert-openapi-schema": "^0.1.2", "@flatfile/plugin-job-handler": "^0.3.3", - "@flatfile/util-common": "^1.0.0", - "@mergeapi/merge-node-client": "^0.1.6", - "axios": "^1.6.5" + "@mergeapi/merge-node-client": "^1.0.4" }, "engines": { "node": ">= 16" } }, - "plugins/merge-connection/node_modules/@mergeapi/merge-node-client": { - "version": "0.1.8", - "dependencies": { - "@types/url-join": "4.0.1", - "@ungap/url-search-params": "0.2.2", - "axios": "0.27.2", - "js-base64": "3.7.2", - "url-join": "4.0.1" - } - }, "plugins/merge-connection/node_modules/@mergeapi/merge-node-client/node_modules/axios": { "version": "0.27.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-0.27.2.tgz", + "integrity": "sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==", + "extraneous": true, "license": "MIT", "dependencies": { "follow-redirects": "^1.14.9", diff --git a/plugins/merge-connection/package.json b/plugins/merge-connection/package.json index 0c7246365..4280cd472 100644 --- a/plugins/merge-connection/package.json +++ b/plugins/merge-connection/package.json @@ -29,9 +29,8 @@ "dependencies": { "@flatfile/api": "^1.7.4", "@flatfile/listener": "^1.0.1", + "@flatfile/plugin-convert-openapi-schema": "^0.1.2", "@flatfile/plugin-job-handler": "^0.3.3", - "@flatfile/util-common": "^1.0.0", - "@mergeapi/merge-node-client": "^0.1.6", - "axios": "^1.6.5" + "@mergeapi/merge-node-client": "^1.0.4" } } diff --git a/plugins/merge-connection/src/config.ts b/plugins/merge-connection/src/config.ts new file mode 100644 index 000000000..127f02c1b --- /dev/null +++ b/plugins/merge-connection/src/config.ts @@ -0,0 +1,102 @@ +export const MERGE_ACCESS_KEY = 'MERGE_ACCESS_KEY' +export const MAX_SYNC_ATTEMPTS = 30 // Thirty cycles is equates to approx. 5 minutes +export const SYNC_RETRY_INTERVAL_MS = 10_000 // 10 seconds +export const CATEGORY_MODELS = { + accounting: { + Account: 'accounts', + Address: 'addresses', + Attachment: 'attachments', + BalanceSheet: 'balanceSheets', + CashFlowStatement: 'cashFlowStatements', + CompanyInfo: 'companyInfo', + CreditNote: 'creditNotes', + Expense: 'expenses', + IncomeStatement: 'incomeStatements', + Invoice: 'invoices', + Item: 'items', + JournalEntry: 'journalEntries', + Payment: 'payments', + PhoneNumber: 'phoneNumbers', + PurchaseOrder: 'purchaseOrders', + TaxRate: 'taxRates', + TrackingCategory: 'trackingCategories', + Transaction: 'transactions', + VendorCredit: 'vendorCredits', + }, + ats: { + Activity: 'activities', + Application: 'applications', + Attachment: 'attachments', + Candidate: 'candidates', + Department: 'departments', + EEOC: 'eeocs', + ScheduledInterview: 'interviews', + JobInterviewStage: 'jobInterviewStages', + Job: 'jobs', + Offer: 'offers', + Office: 'offices', + RejectReason: 'rejectReasons', + Scorecard: 'scorecards', + ScreeningQuestion: 'screeningQuestions', + Tag: 'tags', + RemoteUser: 'users', + }, + crm: { + Account: 'accounts', + Contact: 'contacts', + EngagementType: 'engagementTypes', + Engagement: 'engagements', + Lead: 'leads', + Note: 'notes', + Opportunity: 'opportunities', + Stage: 'stages', + Task: 'tasks', + User: 'users', + }, + filestorage: { + Drive: 'drives', + File: 'files', + Folder: 'folders', + Group: 'groups', + User: 'users', + }, + hris: { + BankInfo: 'bankInfo', + Benefit: 'benefits', + Company: 'companies', + Dependent: 'dependents', + EmployeePayrollRun: 'employeePayrollRuns', + Employee: 'employees', + EmployerBenefit: 'employerBenefits', + Employment: 'employments', + Group: 'groups', + Location: 'locations', + PayGroup: 'payGroups', + PayrollRun: 'payrollRuns', + TimeOff: 'timeOff', + TimeOffBalance: 'timeOffBalances', + }, + mktg: { + Action: 'actions', + Automation: 'automations', + Campaign: 'campaigns', + Contact: 'contacts', + MarketingEmail: 'marketingEmails', + Event: 'events', + List: 'lists', + Message: 'messages', + Template: 'templates', + User: 'users', + }, + ticketing: { + Ticket: 'tickets', + Comment: 'comments', + Attachment: 'attachments', + Collection: 'collections', + Issue: 'issues', + Project: 'projects', + Team: 'teams', + User: 'users', + Tag: 'tags', + }, +} diff --git a/plugins/merge-connection/src/create.workbook.ts b/plugins/merge-connection/src/create.workbook.ts new file mode 100644 index 000000000..468e4e976 --- /dev/null +++ b/plugins/merge-connection/src/create.workbook.ts @@ -0,0 +1,193 @@ +import api, { Flatfile } from '@flatfile/api' +import { FlatfileEvent } from '@flatfile/listener' +import { + PartialSheetConfig, + SetupFactory, + generateSetup, +} from '@flatfile/plugin-convert-openapi-schema' +import { MergeClient } from '@mergeapi/merge-node-client' +import { + CATEGORY_MODELS, + MERGE_ACCESS_KEY, + SYNC_RETRY_INTERVAL_MS, +} from './config' +import { checkAllSyncsComplete } from './sync.status.check' +import { getMergeClient, getSecret, handleError, snakeToCamel } from './utils' + +export function handleCreateConnectedWorkbooks() { + return async ( + event: FlatfileEvent, + tick: (progress: number, message?: string) => Promise + ) => { + try { + const { spaceId, environmentId, jobId } = event.context + + const job = await api.jobs.get(jobId) + const jobInput = job.data.input + const publicToken = jobInput?.publicToken + const apiKey = await getSecret(spaceId, environmentId, MERGE_ACCESS_KEY) + + if (!apiKey) { + throw new Error('Missing Merge API key') + } + + // This MergeClient is used to retrieve the account token + const tmpMergeClient: MergeClient = getMergeClient(apiKey) + + // Since we don't know what categories the Merge integration belongs to, we need to try each one to find + // an account token. However an integration can belong to multiple categories, so the category that + // provides the account token may not be the same category that the user selected. We'll just use the + // first one that works. + let accountTokenObj + const mergeCategories = Object.keys(CATEGORY_MODELS) + for (const categoryAttempt of mergeCategories) { + try { + accountTokenObj = await tmpMergeClient[ + categoryAttempt as keyof typeof mergeClient + ].accountToken.retrieve(publicToken) + if (accountTokenObj) { + break // break out of the loop as soon as a valid account token is retrieved + } + } catch (e) {} // ignore and keep trying + } + + if (!accountTokenObj) { + throw new Error('Error retrieving account token') + } + + // The accountToken is tied to the category selected during the Merge integration setup. + // The `integration.categories` is just a list of all the categories the integration belongs to. + const { + accountToken, + integration: { name, categories }, + } = accountTokenObj + + // This MergeClient is provided the `accountToken` and can be used to retrieve the category + const mergeClient: MergeClient = getMergeClient(apiKey, accountToken) + + // We can retrieve the category off of the modelId of the first synced model + let results + do { + results = await checkAllSyncsComplete( + mergeClient, + // Merge doesn't seem to care what category is used here, it will return the same results regardless + // And since we are searching for the category, we'll just use the first one the integration has listed + categories[0] + ) + await new Promise((resolve) => + setTimeout(resolve, SYNC_RETRY_INTERVAL_MS) + ) + } while (results.syncedModels.length === 0) + + // The modelId is prefix with the category for the given accountToken + const category = results.syncedModels[0].modelId.split('.')[0] + if (!category) { + throw new Error('Error retrieving category') + } + + await tick(20, 'Retrieved account token...') + + // Using the category, we can fetch Merge's schema provided through their OpenAPI spec and convert + // it to a Flatfile sheet config + const models = CATEGORY_MODELS[category] + const sheets: PartialSheetConfig[] = Object.keys(models).map((key) => { + const model = models[key] + return { + name: key, + slug: model, + model: key, + } + }) + + const setup: SetupFactory = await generateSetup({ + workbooks: [ + { + source: `https://api.merge.dev/api/${category}/v1/schema`, + sheets, + }, + ], + }) + const config = typeof setup === 'function' ? await setup(event) : setup + config.workbooks.map((workbook) => { + workbook.sheets.map((sheet) => { + sheet.fields.map((field) => { + // Merge's OpenAPI spec uses snake_case, but Merge's API uses camelCase + field.key = snakeToCamel(field.key) + delete field.description + }) + }) + }) + + await tick(40, 'Retrieved sheet config...') + + const workbookIds = await Promise.all( + config.workbooks.map(async (workbookConfig) => { + const request: Flatfile.CreateWorkbookConfig = { + ...workbookConfig, + spaceId, + environmentId, + name, + labels: ['connection'], + actions: [ + { + operation: 'syncConnectedWorkbook', + mode: 'foreground', + label: 'Sync', + type: 'string', + description: `Sync data from ${name}.`, + }, + ], + metadata: { + connections: [ + { + source: 'Merge', + service: category, + lastSyncedAt: new Date().toISOString(), // TODO: is set for UI purposes, but should be updated after sync + category, + }, + ], + }, + } + const { data: workbook } = await api.workbooks.create(request) + return workbook.id + }) + ) + + await tick(60, 'Created connected workbook...') + + await api.secrets.upsert({ + name: `${workbookIds[0]}:MERGE_X_ACCOUNT_TOKEN`, + value: accountToken, + environmentId, + spaceId, + }) + + await tick(80, 'Created MERGE_X_ACCOUNT_TOKEN secret...') + + // Create a job to sync the workbook immediately + await api.jobs.create({ + type: 'workbook', + operation: 'syncConnectedWorkbook', + status: 'ready', + source: workbookIds[0], + trigger: 'immediate', + mode: 'foreground', + }) + + await tick(90, 'Created workbook sync job...') + + return { + outcome: { + next: { + type: 'id', + id: workbookIds[0], //pick first + label: 'Go to workbook...', + }, + message: `We've created a connected Workbook that perfectly matches the Merge.dev schema for ${name}, ensuring a seamless connection and easy synchronization going forward.`, + }, + } as Flatfile.JobCompleteDetails + } catch (e) { + handleError(e, 'Error creating connected workbook') + } + } +} diff --git a/plugins/merge-connection/src/index.ts b/plugins/merge-connection/src/index.ts index a9eafd8b7..03d675a8b 100644 --- a/plugins/merge-connection/src/index.ts +++ b/plugins/merge-connection/src/index.ts @@ -1,542 +1,3 @@ -import api, { Flatfile } from '@flatfile/api' -import { FlatfileEvent, FlatfileListener } from '@flatfile/listener' -import { jobHandler } from '@flatfile/plugin-job-handler' -import { asyncBatch } from '@flatfile/util-common' -import { - Merge, - MergeClient, - MergeEnvironment, -} from '@mergeapi/merge-node-client' -import axios from 'axios' +import mergePlugin from './merge.plugin' -const MERGE_ACCESS_KEY = 'MERGE_ACCESS_KEY' -const MAX_SYNC_ATTEMPTS = 30 // Thirty cycles is equates to approx. 5 minutes -const SYNC_RETRY_INTERVAL_MS = 10000 // 10 seconds -// TODO: add common models to all categories -const CATEGORY_MODELS = { - accounting: { - Account: 'accounts', - Address: 'addresses', - Attachment: 'attachments', - BalanceSheet: 'balanceSheets', - CashFlowStatement: 'cashFlowStatements', - CompanyInfo: 'companyInfo', - CreditNote: 'creditNotes', - Expense: 'expenses', - IncomeStatement: 'incomeStatements', - Invoice: 'invoices', - Item: 'items', - JournalEntry: 'journalEntries', - Payment: 'payments', - PhoneNumber: 'phoneNumbers', - PurchaseOrder: 'purchaseOrders', - TaxRate: 'taxRates', - TrackingCategory: 'trackingCategories', - Transaction: 'transactions', - VendorCredit: 'vendorCredits', - }, - ats: [], - crm: [], - filestorage: [], - hris: { - BankInfo: 'bankInfo', - Benefit: 'benefits', - Company: 'companies', - Dependent: 'dependents', - EmployeePayrollRun: 'employeePayrollRuns', - Employee: 'employees', - EmployerBenefit: 'employerBenefits', - Employment: 'employments', - Group: 'groups', - Location: 'locations', - PayGroup: 'payGroups', - PayrollRun: 'payrollRuns', - TimeOff: 'timeOff', - TimeOffBalance: 'timeOffBalances', - }, - ticketing: [], -} - -export default function mergePlugin() { - return (listener: FlatfileListener) => { - // The `space:createConnectedWorkbook` job is fired when a Merge connection has been made in the UI. `handleCreateConnectedWorkbooks()` creates the connected workbook mirroring the Merge schema. - listener.use( - jobHandler( - 'space:createConnectedWorkbook', - handleCreateConnectedWorkbooks() - ) - ) - // The `workbook:syncConnectedWorkbook` job syncs the connected workbook and can be triggered by clicking the sync button. - listener.use( - jobHandler( - 'workbook:syncConnectedWorkbook', - handleConnectedWorkbookSync() - ) - ) - } -} - -function handleCreateConnectedWorkbooks() { - return async ( - event: FlatfileEvent, - tick: (progress: number, message?: string) => Promise - ) => { - try { - const { spaceId, environmentId, jobId } = event.context - - const job = await api.jobs.get(jobId) - const jobInput = job.data.input - const publicToken = jobInput.publicToken - const apiKey = await getSecret(spaceId, environmentId, MERGE_ACCESS_KEY) - - if (!apiKey) { - throw new Error('Missing Merge API key') - } - - const mergeClient = getMergeClient(apiKey) - - let accountTokenObj - let category - - // Since we don't know what category the Merge integration belongs to, we need to try each one - const categories = Object.keys(CATEGORY_MODELS) - for (let categoryAttempt of categories) { - try { - accountTokenObj = await mergeClient[ - categoryAttempt - ].accountToken.retrieve(publicToken) - if (accountTokenObj) { - category = categoryAttempt - break // break out of the loop as soon as a valid category is found - } - } catch (e) {} // ignore and keep trying - } - if (!category || !accountTokenObj) { - throw new Error('Error retrieving account token') - } - - const { accountToken, integration } = accountTokenObj - - await tick(20, 'Retrieved account token...') - - // Using the category, we can fetch Merge's schema provided through their OpenAPI spec and convert it to a Flatfile sheet config - const sheets = await openApiSchemaToSheetConfig(category) - - await tick(40, 'Retrieved sheet config...') - - const { data: workbook } = await api.workbooks.create({ - spaceId, - environmentId, - name: integration.name, - labels: ['connection'], - sheets, - actions: [ - { - operation: 'syncConnectedWorkbook', - mode: 'foreground', - label: 'Sync', - type: 'string', - description: `Sync data from ${integration.name}.`, - }, - ], - metadata: { - connections: [ - { - source: 'Merge', - service: category, - lastSyncedAt: new Date().toISOString(), // TODO: is set for UI purposes, but should be updated after sync - category, - }, - ], - }, - }) - - await tick(60, 'Created connected workbook...') - - await api.secrets.upsert({ - name: `${workbook.id}:MERGE_X_ACCOUNT_TOKEN`, - value: accountToken, - environmentId, - spaceId, - }) - - await tick(80, 'Created account token secret...') - - // Create a job to sync the workbook immediately - await api.jobs.create({ - type: 'workbook', - operation: 'syncConnectedWorkbook', - status: 'ready', - source: workbook.id, - trigger: 'immediate', - mode: 'foreground', - }) - - return { - outcome: { - next: { - type: 'id', - id: workbook.id, - label: 'Go to workbook...', - }, - message: `We've created a connected Workbook that perfectly matches the Merge.dev schema for ${integration.name}, ensuring a seamless connection and easy synchronization going forward.`, - }, - } as Flatfile.JobCompleteDetails - } catch (e) { - handleError(e, 'Error creating connected workbook') - } - } -} - -function handleConnectedWorkbookSync() { - return async ( - event: FlatfileEvent, - tick: (progress: number, message?: string) => Promise - ) => { - try { - const { spaceId, workbookId, environmentId } = event.context - const apiKey = await getSecret(spaceId, environmentId, MERGE_ACCESS_KEY) - const accountToken = await getSecret( - spaceId, - environmentId, - `${workbookId}:MERGE_X_ACCOUNT_TOKEN` - ) - - if (!apiKey || !accountToken) { - throw new Error('Missing Merge API key or account token') - } - - const mergeClient = getMergeClient(apiKey, accountToken) - const { data: workbook }: Flatfile.WorkbookResponse = - await api.workbooks.get(workbookId) - const connections = workbook.metadata.connections - const category = connections[0].category // TODO: handle multiple connections??? - const { data: sheets } = await api.sheets.list({ workbookId }) - - await tick(10, `${workbook.name} syncing to Merge...}`) - // Merge may not have synced with the integration, so we need to check and wait for Merge's sync to complete - await waitForMergeSync(mergeClient, category, tick) - await tick(40, 'Syncing data from Merge...') - - // Sync data from Merge to Flatfile - let processedSheets = 0 - for (const sheet of sheets) { - await syncData(mergeClient, sheet.id, category, sheet.config.slug) - processedSheets++ - await tick( - Math.min(90, Math.round(40 + (50 * processedSheets) / sheets.length)), - `Synced ${sheet.config.name}` - ) - } - - // Finally, update the lastSyncedAt date - await api.workbooks.update(workbookId, { - metadata: { - connections: [ - { - ...workbook.metadata.connections[0], - lastSyncedAt: new Date().toISOString(), - }, - ], - }, - }) - await tick(95, 'Updating workbook...') - - return { - outcome: { - message: `${workbook.name} data has been successfully synced from ${workbook.name} to Merge.dev and from Merge.dev to Flatfile.`, - }, - } as Flatfile.JobCompleteDetails - } catch (e) { - handleError(e, 'Error syncing connected workbook') - } - } -} - -async function fetchAllSyncStatuses( - mergeClient: MergeClient, - category: string -) { - let cursor: string | undefined - const allResults = [] - - do { - const paginatedSyncList = await mergeClient[category].syncStatus.list({ - cursor, - }) - allResults.push(...paginatedSyncList.results) - cursor = paginatedSyncList.next - } while (cursor) - - return allResults -} - -async function checkAllSyncsComplete( - mergeClient: MergeClient, - category: string -) { - try { - const allSyncs = await fetchAllSyncStatuses(mergeClient, category) - const completedSyncs = allSyncs.filter( - (syncStatus) => - syncStatus.status === Merge[category].SyncStatusStatusEnum.Done - ).length - const totalModels = allSyncs.length - - return { - allComplete: completedSyncs === totalModels, - completedSyncs, - totalModels, - } - } catch (e) { - handleError(e, 'Error checking sync status') - } -} - -async function waitForMergeSync( - mergeClient: MergeClient, - category: string, - tick: (progress: number, message?: string) => Promise -): Promise { - try { - let attempts = 0 - let syncStatusComplete = false - - while (attempts <= MAX_SYNC_ATTEMPTS && !syncStatusComplete) { - const { allComplete, completedSyncs, totalModels } = - await checkAllSyncsComplete(mergeClient, category) - syncStatusComplete = allComplete - - await tick( - Math.min(40, Math.round(10 + (30 * completedSyncs) / totalModels)), - 'Merge syncing with Integration...' - ) - - if (!syncStatusComplete) { - attempts++ - console.log('Waiting for Merge to sync...') - await new Promise((resolve) => - setTimeout(resolve, SYNC_RETRY_INTERVAL_MS) - ) - } - } - - if (!syncStatusComplete) { - throw new Error('Merge sync timed out') - } - } catch (e) { - handleError(e, 'Error waiting for Merge sync') - } -} - -async function deleteSheetRecords(sheetId: string) { - try { - // TODO: change this to a bulk delete job. `api.records.get` only gets 10k records at a time - const { data: records } = await api.records.get(sheetId) - if (records.records.length > 0) { - const recordIds = records.records.map((record) => { - return record.id - }) - - const options = { chunkSize: 100, parallel: 5, debug: true } - await asyncBatch( - recordIds, - async (chunk) => { - await api.records.delete(sheetId, { ids: chunk }) - }, - options - ) - } - } catch (e) { - handleError(e, `Error deleting records from sheet ${sheetId}`) - } -} - -async function syncData( - mergeClient: MergeClient, - sheetId: string, - category: string, - slug: string -) { - try { - await deleteSheetRecords(sheetId) - - const model = mergeClient[category][slug] - let paginatedList - do { - paginatedList = await model.list({ cursor: paginatedList?.next }) - const records = mapRecords(paginatedList.results) - if (records.length > 0) { - await api.records.insert(sheetId, records, { - compressRequestBody: true, - }) - } - } while (paginatedList.next) - } catch (e) { - console.error(e) - // Don't fail here, this will fail the entire sync - // throw new Error(`Error syncing ${slug} sheet`) - } -} - -function handleError(error: any, message: string) { - console.error(error) - throw new Error(`${message}: ${error.message}`) -} - -function getMergeClient(apiKey: string, accountToken?: string) { - return new MergeClient({ - environment: MergeEnvironment.Production, - apiKey, - ...(accountToken ? { accountToken } : {}), - }) -} - -async function getSecret( - spaceId: string, - environmentId: string, - name: string -): Promise { - try { - const secrets = await api.secrets.list({ spaceId, environmentId }) - return secrets.data.find((secret) => secret.name === name)?.value - } catch (e) { - handleError(e, `Error fetching secret ${name}`) - } -} - -function mapRecords(records: Record): Flatfile.RecordData[] { - return Object.values(records).map((record) => { - const mappedRecord: Flatfile.RecordData = {} - for (let key in record) { - if (record[key]) { - mappedRecord[key] = { value: record[key]?.toString() } - } - } - return mappedRecord - }) -} - -interface OpenApiSchema { - type: string - properties?: Record - enum?: string[] - description?: string - readOnly?: boolean - $ref?: string - [key: string]: any -} - -interface ApiSchemas { - [key: string]: OpenApiSchema -} - -async function openApiSchemaToSheetConfig( - category: string -): Promise { - try { - const response = await axios({ - url: `https://api.merge.dev/api/${category}/v1/schema`, - validateStatus: () => true, - responseType: 'json', - }) - - if (response.status !== 200) { - throw new Error( - `API returned status ${response.status}: ${response.statusText}` - ) - } - - const schemas: ApiSchemas = response.data.components.schemas - - function convertPropertyToField( - key: string, - property: OpenApiSchema - ): Flatfile.BaseProperty | Flatfile.Property { - let field: Flatfile.BaseProperty = { - key: key, - label: property.title || key, - description: `${property.description}`, - readonly: property.readOnly || false, - } - - if (property.$ref) { - const refName = property.$ref.split('/').pop()! - property = schemas[refName] - } else if (property.allOf) { - // If allOf is present, assume the first item in the array is a reference to the actual schema - const refName = property.allOf[0].$ref.split('/').pop()! - property = schemas[refName] - } - - switch (property.type) { - case 'string': - if (property.enum) { - return { - ...field, - type: 'enum', - config: { - options: property.enum.map((e) => ({ - label: e, - value: e, - })), - }, - } as Flatfile.EnumProperty - } else { - return { - ...field, - type: 'string', - } as Flatfile.Property.String - } - case 'integer': - case 'number': - return { - ...field, - type: 'number', - config: { - decimalPlaces: 0, // default decimal places - }, - } as Flatfile.Property.Number - case 'boolean': - return { - ...field, - type: 'boolean', - } as Flatfile.Property.Boolean - default: - console.log( - `Unhandled field '${field.label}' with property type '${property.type}'.` - ) - } - } - - const sheetConfigs: Flatfile.SheetConfig[] = [] - for (const [name, schema] of Object.entries(schemas)) { - if ( - CATEGORY_MODELS[category] && - CATEGORY_MODELS[category].hasOwnProperty(name) - ) { - const fields: Flatfile.Property[] = [] - - for (const [key, property] of Object.entries(schema.properties || {})) { - const field = convertPropertyToField( - key, - property - ) as Flatfile.Property - if (field) { - fields.push(field) - } - } - - sheetConfigs.push({ - name: name, - slug: CATEGORY_MODELS[category][name], - fields: fields, - }) - } - } - - return sheetConfigs - } catch (error) { - handleError(error, 'Error fetching or processing schema') - } -} +export default mergePlugin diff --git a/plugins/merge-connection/src/merge.plugin.ts b/plugins/merge-connection/src/merge.plugin.ts new file mode 100644 index 000000000..45a88afbe --- /dev/null +++ b/plugins/merge-connection/src/merge.plugin.ts @@ -0,0 +1,25 @@ +import { FlatfileListener } from '@flatfile/listener' +import { jobHandler } from '@flatfile/plugin-job-handler' +import { handleCreateConnectedWorkbooks } from './create.workbook' +import { handleConnectedWorkbookSync } from './sync.workbook' + +export default function mergePlugin() { + return (listener: FlatfileListener) => { + // The `space:createConnectedWorkbook` job is fired when a Merge connection has been made in the UI. + // `handleCreateConnectedWorkbooks()` creates the connected workbook mirroring the Merge schema. + listener.use( + jobHandler( + 'space:createConnectedWorkbook', + handleCreateConnectedWorkbooks() + ) + ) + // The `workbook:syncConnectedWorkbook` job syncs the connected workbook and can be triggered by clicking + // the sync button. + listener.use( + jobHandler( + 'workbook:syncConnectedWorkbook', + handleConnectedWorkbookSync() + ) + ) + } +} diff --git a/plugins/merge-connection/src/poll.for.merge.sync.ts b/plugins/merge-connection/src/poll.for.merge.sync.ts new file mode 100644 index 000000000..c70da147c --- /dev/null +++ b/plugins/merge-connection/src/poll.for.merge.sync.ts @@ -0,0 +1,44 @@ +import { Flatfile } from '@flatfile/api' +import { MergeClient } from '@mergeapi/merge-node-client' +import { MAX_SYNC_ATTEMPTS, SYNC_RETRY_INTERVAL_MS } from './config' +import { checkAllSyncsComplete } from './sync.status.check' +import { handleError } from './utils' + +export async function waitForMergeSync( + mergeClient: MergeClient, + category: string, + tick: (progress: number, message?: string) => Promise +): Promise { + try { + let attempts = 0 + let syncStatusComplete = false + + while (attempts <= MAX_SYNC_ATTEMPTS && !syncStatusComplete) { + const { allComplete, completedSyncs, totalModels, syncedModels } = + await checkAllSyncsComplete(mergeClient, category) + + if (!syncStatusComplete) { + syncStatusComplete = allComplete + + await tick( + Math.min(40, Math.round(10 + (30 * completedSyncs) / totalModels)), + 'Merge syncing with Integration...' + ) + } + + if (!syncStatusComplete) { + attempts++ + console.log('Waiting for Merge to sync...') + await new Promise((resolve) => + setTimeout(resolve, SYNC_RETRY_INTERVAL_MS) + ) + } + } + + if (!syncStatusComplete) { + throw new Error('Merge sync timed out') + } + } catch (e) { + handleError(e, 'Error waiting for Merge sync') + } +} diff --git a/plugins/merge-connection/src/sync.data.ts b/plugins/merge-connection/src/sync.data.ts new file mode 100644 index 000000000..a9df8ba3a --- /dev/null +++ b/plugins/merge-connection/src/sync.data.ts @@ -0,0 +1,52 @@ +import api from '@flatfile/api' +import { MergeClient } from '@mergeapi/merge-node-client' +import { handleError, mapRecords } from './utils' + +export async function syncData( + mergeClient: MergeClient, + sheetId: string, + category: string, + slug: string +) { + try { + await deleteSheetRecords(sheetId) + + const model = mergeClient[category as keyof MergeClient] + let paginatedList: { results: any[]; next?: string } | null = null + do { + paginatedList = (await (model[slug as keyof typeof model] as any).list({ + cursor: paginatedList?.next, + })) as { results: any[]; next?: string } + const records = mapRecords(paginatedList.results) + if (records.length > 0) { + await api.records.insert(sheetId, records, { + compressRequestBody: true, + }) + } + } while (paginatedList?.next) + } catch (e) { + console.error(e) + // Don't fail here, this will fail the entire sync + // throw new Error(`Error syncing ${slug} sheet`) + } +} + +async function deleteSheetRecords(sheetId: string) { + try { + const { data: sheet } = await api.sheets.get(sheetId) + if (sheet.countRecords.total > 0) { + await api.jobs.create({ + type: 'workbook', + operation: 'delete-records', + trigger: 'immediate', + source: sheet.workbookId, + config: { + sheet: sheetId, + filter: 'all', + }, + }) + } + } catch (e) { + handleError(e, `Error deleting records from sheet ${sheetId}`) + } +} diff --git a/plugins/merge-connection/src/sync.status.check.ts b/plugins/merge-connection/src/sync.status.check.ts new file mode 100644 index 000000000..ae252fb17 --- /dev/null +++ b/plugins/merge-connection/src/sync.status.check.ts @@ -0,0 +1,27 @@ +import { Merge, MergeClient } from '@mergeapi/merge-node-client' +import { fetchAllSyncStatuses } from './sync.status.fetch' +import { handleError } from './utils' + +export async function checkAllSyncsComplete( + mergeClient: MergeClient, + category: string +) { + try { + const allSyncs = await fetchAllSyncStatuses(mergeClient, category) + const syncedModels = allSyncs.filter( + (syncStatus) => + syncStatus.status !== Merge[category].SyncStatusStatusEnum.Syncing + ) + const completedSyncs = syncedModels.length + const totalModels = allSyncs.length + + return { + allComplete: completedSyncs === totalModels, + completedSyncs, + totalModels, + syncedModels, + } + } catch (e) { + handleError(e, 'Error checking sync status') + } +} diff --git a/plugins/merge-connection/src/sync.status.fetch.ts b/plugins/merge-connection/src/sync.status.fetch.ts new file mode 100644 index 000000000..5036953a9 --- /dev/null +++ b/plugins/merge-connection/src/sync.status.fetch.ts @@ -0,0 +1,19 @@ +import { MergeClient } from '@mergeapi/merge-node-client' + +export async function fetchAllSyncStatuses( + mergeClient: MergeClient, + category: string +) { + let cursor: string | undefined + const allResults = [] + + do { + const paginatedSyncList = await mergeClient[category].syncStatus.list({ + cursor, + }) + allResults.push(...paginatedSyncList.results) + cursor = paginatedSyncList.next + } while (cursor) + + return allResults +} diff --git a/plugins/merge-connection/src/sync.workbook.ts b/plugins/merge-connection/src/sync.workbook.ts new file mode 100644 index 000000000..710720b47 --- /dev/null +++ b/plugins/merge-connection/src/sync.workbook.ts @@ -0,0 +1,72 @@ +import api, { Flatfile } from '@flatfile/api' +import { FlatfileEvent } from '@flatfile/listener' +import { MERGE_ACCESS_KEY } from './config' +import { waitForMergeSync } from './poll.for.merge.sync' +import { syncData } from './sync.data' +import { getMergeClient, getSecret, handleError } from './utils' + +export function handleConnectedWorkbookSync() { + return async ( + event: FlatfileEvent, + tick: (progress: number, message?: string) => Promise + ) => { + try { + const { spaceId, workbookId, environmentId } = event.context + const apiKey = await getSecret(spaceId, environmentId, MERGE_ACCESS_KEY) + const accountToken = await getSecret( + spaceId, + environmentId, + `${workbookId}:MERGE_X_ACCOUNT_TOKEN` + ) + + if (!apiKey || !accountToken) { + throw new Error('Missing Merge API key or account token') + } + + const mergeClient = getMergeClient(apiKey, accountToken) + const { data: workbook }: Flatfile.WorkbookResponse = + await api.workbooks.get(workbookId) + const connections = workbook.metadata.connections + const category = connections[0].category // TODO: handle multiple connections??? + const { data: sheets } = await api.sheets.list({ workbookId }) + + await tick(10, `${workbook.name} syncing to Merge...}`) + // Merge may not have synced with the integration, so we need to check and wait for Merge's sync to complete + await waitForMergeSync(mergeClient, category, tick) + await tick(40, 'Syncing data from Merge...') + + // Sync data from Merge to Flatfile + let processedSheets = 0 + for (const sheet of sheets) { + const slug = sheet.config.slug! + await syncData(mergeClient, sheet.id, category, slug) + processedSheets++ + await tick( + Math.min(90, Math.round(40 + (50 * processedSheets) / sheets.length)), + `Synced ${sheet.config.name}` + ) + } + + // Finally, update the lastSyncedAt date + await api.workbooks.update(workbookId, { + metadata: { + connections: [ + { + ...workbook.metadata.connections[0], + lastSyncedAt: new Date().toISOString(), + }, + ], + }, + }) + await tick(95, 'Updating workbook...') + + return { + outcome: { + message: `${workbook.name} data has been successfully synced from ${workbook.name} to Merge.dev and from Merge.dev to Flatfile.`, + }, + } as Flatfile.JobCompleteDetails + } catch (e) { + handleError(e, 'Error syncing connected workbook') + } + } +} diff --git a/plugins/merge-connection/src/utils.ts b/plugins/merge-connection/src/utils.ts new file mode 100644 index 000000000..1c824c3c6 --- /dev/null +++ b/plugins/merge-connection/src/utils.ts @@ -0,0 +1,52 @@ +import api, { Flatfile } from '@flatfile/api' +import { MergeClient, MergeEnvironment } from '@mergeapi/merge-node-client' + +export function getMergeClient(apiKey: string, accountToken?: string) { + return new MergeClient({ + environment: MergeEnvironment.Production, + apiKey, + ...(accountToken ? { accountToken } : {}), + }) +} + +export function handleError(error: any, message: string) { + console.error(error) + throw new Error(`${message}: ${error.message}`) +} + +export async function getSecret( + spaceId: string, + environmentId: string, + name: string +): Promise { + try { + const secrets = await api.secrets.list({ spaceId, environmentId }) + return secrets.data.find((secret) => secret.name === name)?.value + } catch (e) { + handleError(e, `Error fetching secret ${name}`) + } +} + +export function mapRecords( + records: Record +): Flatfile.RecordData[] { + return Object.values(records).map((record) => { + const mappedRecord: Flatfile.RecordData = {} + for (let key in record) { + mappedRecord[key] = { value: record[key]?.toString() } + } + return mappedRecord + }) +} + +export function snakeToCamel(snakeCaseString: string): string { + return snakeCaseString + .split('_') + .map((word, index) => { + if (index === 0) { + return word + } + return word.charAt(0).toUpperCase() + word.slice(1) + }) + .join('') +}