Skip to content

Commit

Permalink
feat: updates to match API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
carlbrugger committed Mar 6, 2024
1 parent aa2552b commit 9c79bee
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 69 deletions.
53 changes: 37 additions & 16 deletions plugins/foreign-db-extractor/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import api, { Flatfile } from '@flatfile/api'
import FlatfileListener, { FlatfileEvent } from '@flatfile/listener'
import sql from 'mssql'
import { generateSheets } from './generate.sheets'
import { restoreDatabase } from './restore.database'
import {
DBUser,
getUser,
pollDatabaseStatus,
restoreDatabase,
} from './restore.database'
import { s3Upload } from './upload.s3'

export const foreignDBExtractor = () => {
Expand Down Expand Up @@ -47,31 +53,47 @@ export const foreignDBExtractor = () => {
const job = await api.jobs.get(jobId)
const { fileName } = job.data.input

// Step 1: Upload file to S3
await tick(10, 'Uploading file to S3 bucket')
await s3Upload(fileId)

// Step 2: Create a Workbook
await tick(45, 'Creating workbook')

// Create a workbook so we can use the workbookId to name the database
// Step 2.1: Create a workbook so we can use the workbookId to name the database
await tick(5, 'Creating workbook')
const { data: workbook } = await api.workbooks.create({
name: `[file] ${fileName}`,
labels: ['file'],
spaceId,
environmentId,
})

// Step 3: Restore DB from Backup
// Step 2.2: Upload file to S3, this is required to restore to RDS
await tick(10, 'Uploading file to S3 bucket')
await s3Upload(workbook.id, fileId)

// Step 2.3: Restore DB from Backup on S3
await tick(50, 'Restoring database')
const connectionConfig = await restoreDatabase(workbook.id, fileId)
const connectionConfig: sql.config = await restoreDatabase(
workbook.id,
fileId
)

// Step 2.4: Poll for database availability
await tick(60, 'Polling for database availability')
await pollDatabaseStatus(connectionConfig)

// Step 2.5: Retrieve user credentials for the database
await tick(85, 'Retrieving database user credentials')
try {
const user = (await getUser(connectionConfig.database)) as DBUser
connectionConfig.user = user.username
connectionConfig.password = user.password
} catch (e) {
throw e
}

// Step 4: Create a Workbook
// Step 2.6: Create a Workbook
// Get column names for all tables, loop through them and create Sheets for each table
await tick(90, 'Creating workbook')
const sheets = await generateSheets(connectionConfig)

// Sheets need to be added before adding the connectionType to ensure the correct ephemeral driver is used
// Sheets need to be added to the workbook before adding the connectionType to ensure the correct ephemeral
// driver is used
await api.workbooks.update(workbook.id, {
sheets,
})
Expand All @@ -83,7 +105,7 @@ export const foreignDBExtractor = () => {
},
})

// Step 5: Update file with workbookId
// Step 2.7: Update file with workbookId
await tick(95, 'Updating file')
await api.files.update(fileId, {
workbookId: workbook.id,
Expand All @@ -96,9 +118,8 @@ export const foreignDBExtractor = () => {
},
})
} catch (e) {
console.log(`error ${e}`)
await api.jobs.fail(jobId, {
info: `Extraction failed ${e.message}`,
info: e.message,
})
}
}
Expand Down
203 changes: 171 additions & 32 deletions plugins/foreign-db-extractor/src/restore.database.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,188 @@
import sql from 'mssql'
import fetch from 'node-fetch'

/**
* The restoreDatabase function is responsible for initiating the database restore process. It sends a POST request to
* the foreigndb API to restore the database from the given fileId. The databaseName should be the workbookId for the
* Workbook that will be associated to the file being restored.
*
* @param databaseName
* @param fileId
* @returns
*/
export async function restoreDatabase(
databaseName: string,
fileId: string
): Promise<sql.config> {
const databaseRestoreResponse = await fetch(
`${process.env.FLATFILE_API_URL}/v1/database/restore`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.FLATFILE_API_KEY}`,
},
body: JSON.stringify({
databaseName,
fileId,
}),
): Promise<sql.config | Error> {
try {
const response = await fetch(
`${process.env.AGENT_INTERNAL_URL}/v1/foreigndb/${databaseName}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.FLATFILE_BEARER_TOKEN}`,
},
body: JSON.stringify({
fileId,
}),
}
)

const jsonResponse = (await response.json()) as RestoreDatabaseResponse

if (response.status !== 200) {
throw new Error(
`Status: ${response.status} Message: ${jsonResponse.errors[0].message}`
)
}
const data = jsonResponse.data

return {
server: data.host,
database: data.dbname,
options: { port: data.port, trustServerCertificate: true },
connectionTimeout: 30000,
requestTimeout: 90000,
timeout: 15000,
}
)
} catch (e) {
throw new Error(`An error occurred during DB restore: ${e.message}`)
}
}

const jsonResponse =
(await databaseRestoreResponse.json()) as RestoreDatabaseResponse
/**
* The getDatabaseInfo function is responsible for retrieving the status of the database restore process. It sends a GET
* request to the foreigndb API to retrieve the status of the database restore process.
*
* @param databaseName
* @returns
*/
export async function getDatabaseInfo(
databaseName: string
): Promise<Task | Error> {
try {
const response = await fetch(
`${process.env.AGENT_INTERNAL_URL}/v1/foreigndb/${databaseName}`,
{
method: 'GET',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.FLATFILE_BEARER_TOKEN}`,
},
}
)
const jsonResponse = (await response.json()) as GetDatabaseResponse

if (databaseRestoreResponse.status !== 200) {
throw new Error(jsonResponse.errors[0].message)
if (response.status !== 200) {
throw new Error(
`Status: ${response.status} Message: ${jsonResponse.errors[0].message}`
)
}
return jsonResponse.data.task
} catch (e) {
throw new Error(`An error occurred retrieving DB info: ${e.message}`)
}
}

return {
user: jsonResponse.username,
password: jsonResponse.password,
server: jsonResponse.host,
database: jsonResponse.dbname,
options: { port: 1433, trustServerCertificate: true },
connectionTimeout: 30000,
requestTimeout: 90000,
timeout: 15000,
/**
* The getUser function is responsible for retrieving the user credentials for the database. It sends a GET request to
* the foreigndb API to retrieve the user credentials for the database. This must be called after the database restore
* is complete.
*
* @param databaseName
* @returns
*/
export async function getUser(databaseName: string): Promise<DBUser | Error> {
try {
const userResponse = await fetch(
`${process.env.AGENT_INTERNAL_URL}/v1/foreigndb/${databaseName}/user`,
{
method: 'GET',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.FLATFILE_BEARER_TOKEN}`,
},
}
)

const jsonResponse = (await userResponse.json()) as AddUserResponse

if (userResponse.status !== 200) {
throw new Error(
`Status: ${userResponse.status} Message: ${jsonResponse.errors[0].message}`
)
}
const data = jsonResponse.data

return { ...data }
} catch (e) {
throw new Error(`An error occurred adding user to database: ${e.message}`)
}
}

type RestoreDatabaseResponse = {
host: string
port: number
dbname: string
/**
* The pollDatabaseStatus function is responsible for polling the database status until it is complete. It sends a GET
* request to the foreigndb API to retrieve the status of the database restore process. It will continue to poll until
* the status is either 'SUCCESS' or 'ERROR'.
*
* @param connectionConfig
* @returns
*/
export async function pollDatabaseStatus(
connectionConfig: sql.config
): Promise<void> {
const maxAttempts = 36 // 3 minutes
const retryDelay = 5_000
let attempts = 0
let status
while (attempts < maxAttempts && !['SUCCESS', 'ERROR'].includes(status)) {
const task = (await getDatabaseInfo(connectionConfig.database)) as Task
status = task.status
await new Promise((resolve) => setTimeout(resolve, retryDelay))
attempts++
}

if (!status || status === 'ERROR') {
throw new Error('Database restore failed')
}
}

export type DBUser = {
username: string
password: string
errors: { message: string }[]
}

type RestoreDatabaseResponse = {
data: {
host: string
port: number
dbname: string
}
errors: [{ key: string; message: string }]
}

type GetDatabaseResponse = {
data: {
task: {
status: string
progress: number
type: string
}
}
errors: any[]
}

type AddUserResponse = {
data: {
username: string
password: string
}
errors: [{ key: string; message: string }]
}

type Task = {
status: string
progress: number
type: string
}
53 changes: 32 additions & 21 deletions plugins/foreign-db-extractor/src/upload.s3.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
import fetch from 'node-fetch'

export async function s3Upload(fileId: string) {
const storageUploadResponse = await fetch(
`${process.env.FLATFILE_API_URL}/v1/storage`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.FLATFILE_API_KEY}`,
},
body: JSON.stringify({
fileId,
}),
}
)
export async function s3Upload(
workbookId: string,
fileId: string
): Promise<boolean | Error> {
try {
const storageUploadResponse = await fetch(
`${process.env.AGENT_INTERNAL_URL}/v1/foreigndb/${workbookId}/storage`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.FLATFILE_BEARER_TOKEN}`,
},
body: JSON.stringify({
fileId,
}),
}
)

const jsonResponse = (await storageUploadResponse.json()) as S3UploadResponse
if (storageUploadResponse.status !== 200) {
throw new Error(jsonResponse.errors[0].message)
const jsonResponse =
(await storageUploadResponse.json()) as S3UploadResponse
if (storageUploadResponse.status !== 200) {
throw new Error(
`Status: ${storageUploadResponse.status} Message: ${jsonResponse.errors[0].message}`
)
}
return jsonResponse.data.success
} catch (e) {
throw new Error(`An error occurred during S3 upload: ${e.message}`)
}

return jsonResponse.arn
}

type S3UploadResponse = {
arn: string
errors: { message: string }[]
data: {
success: boolean
}
errors: [{ key: string; message: string }]
}

0 comments on commit 9c79bee

Please sign in to comment.