Skip to content

Commit

Permalink
feat: improving retries; consider restart of app
Browse files Browse the repository at this point in the history
  • Loading branch information
tiago-freire committed Sep 2, 2024
1 parent f320576 commit 45848b2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 10 deletions.
4 changes: 2 additions & 2 deletions node/clients/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type {
ImportExecution,
} from 'ssesandbox04.catalog-importer'

import { CURRENT_MD_SCHEMA, MD_ENTITIES } from '../helpers'
import { CURRENT_MD_SCHEMA, MAX_RETRIES, MD_ENTITIES } from '../helpers'
import Cosmos from './Cosmos'
import HttpClient from './HttpClient'
import PrivateClient from './PrivateClient'
Expand Down Expand Up @@ -59,7 +59,7 @@ export default {
exponentialTimeoutCoefficient: 2,
exponentialBackoffCoefficient: 2,
initialBackoffDelay: 100,
retries: 10,
retries: MAX_RETRIES,
timeout: 300000,
concurrency: 1,
},
Expand Down
1 change: 1 addition & 0 deletions node/helpers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ export const IMPORT_STATUS: { [keyof in ImportStatus]: ImportStatus } = {

export const STEP_DELAY = 1000
export const DEFAULT_BATCH_CONCURRENCY = 500
export const MAX_RETRIES = 10
export const ONE_RESULT = { page: 1, pageSize: 1 }
export const COMMON_WHERE = `(status<>${IMPORT_STATUS.TO_BE_DELETED})AND(status<>${IMPORT_STATUS.DELETING})`
export const GET_DETAILS_CONCURRENCY = 25
Expand Down
33 changes: 30 additions & 3 deletions node/helpers/setupVerifyImports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,39 @@ import {
getFirstImportPending,
getFirstImportRunning,
getFirstImportToBeDeleted,
IMPORT_STATUS,
updateImportStatus,
} from '.'
import runImport from '../events'

let cachedContext: Context | undefined
const TIMEOUT = 10000
let hasImportRun = false

export const getCachedContext = () => {
export function getCachedContext() {
return cachedContext
}

export function setCachedContext(context: Context) {
cachedContext = context
}

export function setHasImportRun() {
hasImportRun = true
}

export function getHasImportRun() {
return hasImportRun
}

const verifyImports = async () => {
const context = getCachedContext()

if (!context || (await getFirstImportRunning(context))) return
if (!context) return

const importRunning = await getFirstImportRunning(context)

if (importRunning && getHasImportRun()) return

const nextImportToBeDeleted = await getFirstImportToBeDeleted(context)

Expand All @@ -30,12 +45,24 @@ const verifyImports = async () => {
return
}

const nextPendingImport = await getFirstImportPending(context)
const nextPendingImport =
importRunning && !getHasImportRun()
? importRunning
: await getFirstImportPending(context)

if (!nextPendingImport) return

if (nextPendingImport.status === IMPORT_STATUS.RUNNING) {
await updateImportStatus(
context,
nextPendingImport.id,
IMPORT_STATUS.PENDING
)
}

context.state.body = nextPendingImport
runImport(context)
setHasImportRun()
}

export const setupVerifyImports = () => {
Expand Down
34 changes: 31 additions & 3 deletions node/helpers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
DEFAULT_BATCH_CONCURRENCY,
DEFAULT_VBASE_BUCKET,
IMPORT_STATUS,
MAX_RETRIES,
STEP_DELAY,
STEPS,
} from '.'
Expand All @@ -16,9 +17,34 @@ export const getCurrentSettings = async ({ clients: { apps } }: Context) =>
export const delay = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms))

const promiseWithConditionalRetry = async <T, R = void>(
fn: (element: T) => Promise<Maybe<R>> | R,
arg: T,
retries = 0
): Promise<Maybe<R>> => {
return Promise.resolve(fn?.(arg))?.catch(async (e) => {
const message = e.message.toLowerCase()
const messageToRetry =
message.includes('500') ||
message.includes('429') ||
message.includes('network error') ||
message.includes('networkerror') ||
message.includes('genericerror') ||
message.includes('unhealthy')

if (messageToRetry && retries < MAX_RETRIES) {
await delay(STEP_DELAY * (retries + 1))

return promiseWithConditionalRetry(fn, arg, retries + 1)
}

throw e
})
}

export const batch = async <T, R = void>(
data: T[],
elementCallback: (element: T) => Maybe<Promise<R>> | R,
elementCallback: (element: T) => Promise<Maybe<R>> | R,
concurrency = DEFAULT_BATCH_CONCURRENCY
) => {
const cloneData = [...data]
Expand All @@ -28,7 +54,9 @@ export const batch = async <T, R = void>(
if (!cloneData.length) return

const result = ((await Promise.all(
cloneData.splice(0, concurrency).map(elementCallback)
cloneData.splice(0, concurrency).map(async (element) => {
return promiseWithConditionalRetry(elementCallback, element)
})
)) as unknown) as R

if (result) {
Expand All @@ -45,7 +73,7 @@ export const batch = async <T, R = void>(

export const sequentialBatch = async <T, R = void>(
data: T[],
elementCallback: (element: T) => Maybe<Promise<R>> | R
elementCallback: (element: T) => Promise<Maybe<R>> | R
) => {
return batch(data, elementCallback, 1)
}
Expand Down
4 changes: 2 additions & 2 deletions react/components/graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type CustomQueryHookOptions<T, V> = QueryHookOptions<T, V> &
type CustomMutationHookOptions<T, V> = MutationHookOptions<T, V> &
CustomGraphQLOptions

const MAX_RETRIES = 5
const MAX_RETRIES = 10
const RETRY_DELAY = 500

export const getGraphQLMessageDescriptor = (error: GraphQLError) => ({
Expand All @@ -57,7 +57,7 @@ const useErrorRetry = <T = Query, V = undefined>(
message.includes('unhealthy')

if (messageToRetry && retries.current < MAX_RETRIES) {
setTimeout(() => refetch(), RETRY_DELAY)
setTimeout(() => refetch(), RETRY_DELAY * (retries.current + 1))
retries.current++
} else {
toastError &&
Expand Down

0 comments on commit 45848b2

Please sign in to comment.