Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run promises in order #115

Merged
merged 15 commits into from
Oct 18, 2023
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ driver:
projectNamespace: flowforge
cloudProvider: aws
privateCA: ff-ca-certs
k8sDelay: 1000
k8sRetries: 10
```

- `registry` is the Docker Registry to load Stack Containers from
Expand All @@ -26,6 +28,8 @@ should run on
- `cloudProvider` can be left unset for none `aws` deployments. This triggers the adding of
AWS EKS specific annotation for ALB Ingress.
- `privateCA` name of ConfigMap holding PEM CA Cert Bundle (file name `certs.pem`) Optional
- `k8sRetries` how many times to retry actions against the K8s API
- `k8sDelay` how long to wait (in ms) between retries to the K8s API

Expects to pick up K8s credentials from the environment

Expand Down
253 changes: 179 additions & 74 deletions kubernetes.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const got = require('got')
const k8s = require('@kubernetes/client-node')
const _ = require('lodash')

/**
* Kubernates Container driver
Expand Down Expand Up @@ -372,63 +373,119 @@ const createProject = async (project, options) => {
const localService = await createService(project, options)
const localIngress = await createIngress(project, options)

const promises = []
promises.push(this._k8sAppApi.createNamespacedDeployment(namespace, localDeployment).catch(err => {
this._app.log.error(`[k8s] Project ${project.id} - error creating deployment: ${err.toString()}`)
this._app.log.error(`[k8s] deployment ${JSON.stringify(localDeployment, undefined, 2)}`)
this._app.log.error(err)
// rethrow the error so the wrapper knows this hasn't worked
throw err
}))
/* eslint n/handle-callback-err: "off" */
promises.push(this._k8sApi.createNamespacedService(namespace, localService).catch(err => {
// TODO: This will fail if the service already exists. Which it okay if
// we're restarting a suspended project. As we don't know if we're restarting
// or not, we don't know if this is fatal or not.

// Once we can know if this is a restart or create, then we can decide
// whether to throw this error or not. For now, this will silently
// let it pass
//
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating service: ${err.toString()}`)
}
// throw err
}))

// if (project.changedName) {
// promises.push(this._k8sNetApi.replaceNamespacedIngress(project.safeName,namespace, localIngress)).catch(err => {
// this._app.log.error(`[k8s] Project ${project.id} - error updating ingress: ${err.toString()}`)
// }).then (async () => {
// this._app.log.info(`[k8s] Ingress for project ${project.id} updated`)
// })
// } else {
promises.push(this._k8sNetApi.createNamespacedIngress(namespace, localIngress).catch(err => {
// TODO: This will fail if the service already exists. Which it okay if
// we're restarting a suspended project. As we don't know if we're restarting
// or not, we don't know if this is fatal or not.

// Once we can know if this is a restart or create, then we can decide
// whether to throw this error or not. For now, this will silently
// let it pass
//
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating ingress: ${err.toString()}`)
}
// throw err
}).then(async () => {
this._app.log.info(`[k8s] Ingress creation completed for project ${project.id}`)
}))
// }
try {
await this._k8sAppApi.createNamespacedDeployment(namespace, localDeployment)
} catch (err) {
if (err.statusCode === 409) {
// If deployment exists, perform an upgrade
this._app.log.warn(`[k8s] Deployment for project ${project.id} already exists. Upgrading deployment`)
const result = await this._k8sAppApi.readNamespacedDeployment(project.safeName, namespace)

const existingDeployment = result.body
// Check if the metadata and spec are aligned. They won't be though (at minimal because we regenerate auth)
if (!_.isEqual(existingDeployment.metadata, localDeployment.metadata) || !_.isEqual(existingDeployment.spec, localDeployment.spec)) {
// If not aligned, replace the deployment
await this._k8sAppApi.replaceNamespacedDeployment(project.safeName, namespace, localDeployment)
}
} else {
// Log other errors and rethrow them for additional higher-level handling
this._app.log.error(`[k8s] Unexpected error creating deployment for project ${project.id}.`)
this._app.log.error(`[k8s] deployment ${JSON.stringify(localDeployment, undefined, 2)}`)
this._app.log.error(err)
// rethrow the error so the wrapper knows this hasn't worked
throw err
}
}

await project.updateSetting('k8sType', 'deployment')
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sAppApi.readNamespacedDeployment(project.safeName, this._namespace)
clearInterval(pollInterval)
resolve()
} catch (err) {
// hmm
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timeout waiting for Deployment`)
reject(new Error('Timed out to creating Deployment'))
}
}
}, this._k8sDelay)
})

try {
await this._k8sApi.createNamespacedService(namespace, localService)
} catch (err) {
if (err.statusCode === 409) {
this._app.log.warn(`[k8s] Service for project ${project.id} already exists, proceeding...`)
} else {
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating service: ${err.toString()}`)
throw err
}
}
}

const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sApi.readNamespacedService(prefix + project.safeName, this._namespace)
clearInterval(pollInterval)
resolve()
} catch (err) {
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timeout waiting for Service`)
reject(new Error('Timed out to creating Service'))
}
}
}, this._k8sDelay)
})

return Promise.all(promises).then(async () => {
this._app.log.debug(`[k8s] Container ${project.id} started`)
project.state = 'running'
await project.save()
this._projects[project.id].state = 'starting'
try {
await this._k8sNetApi.createNamespacedIngress(namespace, localIngress)
} catch (err) {
if (err.statusCode === 409) {
this._app.log.warn(`[k8s] Ingress for project ${project.id} already exists, proceeding...`)
} else {
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating ingress: ${err.toString()}`)
throw err
}
}
}

await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sNetApi.readNamespacedIngress(project.safeName, this._namespace)
clearInterval(pollInterval)
resolve()
} catch (err) {
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timeout waiting for Ingress`)
reject(new Error('Timed out to creating Ingress'))
}
}
}, this._k8sDelay)
})

await project.updateSetting('k8sType', 'deployment')

this._app.log.debug(`[k8s] Container ${project.id} started`)
project.state = 'running'
await project.save()

this._projects[project.id].state = 'starting'
}

// eslint-disable-next-line no-unused-vars
Expand Down Expand Up @@ -528,6 +585,8 @@ module.exports = {
this._options = options

this._namespace = this._app.config.driver.options.projectNamespace || 'flowforge'
this._k8sDelay = this._app.config.driver.options.k8sDelay || 1000
this._k8sRetries = this._app.config.driver.options.k8sRetries || 10

const kc = new k8s.KubeConfig()

Expand Down Expand Up @@ -563,7 +622,7 @@ module.exports = {

this._initialCheckTimeout = setTimeout(() => {
this._app.log.debug('[k8s] Restarting projects')
const namespace = options.projectNamespace || 'flowforge'
const namespace = this._namespace
projects.forEach(async (project) => {
try {
if (project.state === 'suspended') {
Expand Down Expand Up @@ -602,7 +661,8 @@ module.exports = {
await this._k8sAppApi.readNamespacedDeployment(project.safeName, namespace)
this._app.log.info(`[k8s] deployment ${project.id} in ${namespace} found`)
} catch (err) {
this._app.log.debug(`[k8s] Project ${project.id} - recreating deployment`)
this._app.log.error(`[k8s] Error while reading namespaced deployment for project '${project.safeName}' ${project.id}. Error msg=${err.message}, stack=${err.stack}`)
this._app.log.info(`[k8s] Project ${project.id} - recreating deployment`)
const fullProject = await this._app.db.models.Project.byId(project.id)
await createProject(fullProject, options)
}
Expand Down Expand Up @@ -678,29 +738,69 @@ module.exports = {
* @param {Project} project - the project model instance
*/
stop: async (project) => {
// Stop the project, but don't remove all of its resources.
// Stop the project
this._projects[project.id].state = 'stopping'

try {
await this._k8sNetApi.deleteNamespacedIngress(project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting ingress: ${err.toString()}`)
}
if (project.safeName.match(/^[0-9]/)) {
try {
await this._k8sApi.deleteNamespacedService('srv-' + project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting service: ${err.toString()}`)
}
} else {
try {
await this._k8sApi.deleteNamespacedService(project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting service: ${err.toString()}`)
}

// Note that, regardless, the main objective is to delete deployment (runnable)
// Even if some k8s resources like ingress or service are still not deleted (maybe because of
// k8s service latency), the most important thing is to get to deployment.
try {
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sNetApi.readNamespacedIngress(project.safeName, this._namespace)
} catch (err) {
clearInterval(pollInterval)
resolve()
}
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timed out deleting ingress`)
reject(new Error('Timed out to deleting Ingress'))
}
}, this._k8sDelay)
})
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - Ingress was not deleted: ${err.toString()}`)
}

const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
try {
await this._k8sApi.deleteNamespacedService(prefix + project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting service: ${err.toString()}`)
}

try {
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sApi.readNamespacedService(prefix + project.safeName, this._namespace)
} catch (err) {
clearInterval(pollInterval)
resolve()
}
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timed deleting service`)
reject(new Error('Timed out to deleting Service'))
}
}, this._k8sDelay)
})
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - Service was not deleted: ${err.toString()}`)
}

// For now, we just want to remove the Pod/Deployment
const currentType = await project.getSetting('k8sType')
let pod = true
if (currentType === 'deployment') {
Expand All @@ -711,19 +811,26 @@ module.exports = {
}

this._projects[project.id].state = 'suspended'
return new Promise(resolve => {
return new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
if (pod) {
await this._k8sApi.readNamespacedPodStatus(project.safeName, this._namespace)
} else {
await this._k8sAppApi.readNamespacedDeployment(project.safeName, this._namespace)
}
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timed deleting ${pod ? 'Pod' : 'Deployment'}`)
reject(new Error('Timed out to deleting Deployment'))
}
} catch (err) {
clearInterval(pollInterval)
resolve()
}
}, 1000)
}, this._k8sDelay)
})
},

Expand All @@ -733,8 +840,6 @@ module.exports = {
* @return {Object}
*/
remove: async (project) => {
// let project = await this._app.db.models.Project.byId(id)

try {
await this._k8sNetApi.deleteNamespacedIngress(project.safeName, this._namespace)
} catch (err) {
Expand Down
13 changes: 12 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"license": "Apache-2.0",
"dependencies": {
"@kubernetes/client-node": "^0.18.1",
"got": "^11.8.0"
"got": "^11.8.0",
"lodash": "^4.17.21"
},
"devDependencies": {
"eslint": "^8.25.0",
Expand Down