Skip to content

Commit

Permalink
MIDRC-650 Cancel Nextflow jobs on workspace termination (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulineribeyre authored Mar 25, 2024
1 parent f42198a commit 4466544
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 27 deletions.
15 changes: 7 additions & 8 deletions hatchery/gen3licenseusermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package hatchery
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -52,31 +51,31 @@ var validateContainerLicenseInfo = func(containerName string, licenseInfo Licens
var ok = true
// print any items that are missing from LicenseInfo
if !licenseInfo.Enabled {
fmt.Printf("Warning: License is not enabled for container %s\n", containerName)
Config.Logger.Printf("Warning: License is not enabled for container %s\n", containerName)
ok = false
}
if licenseInfo.LicenseType == "" {
fmt.Printf("Error in container config. Empty LicenseType for container %s\n", containerName)
Config.Logger.Printf("Error in container config. Empty LicenseType for container %s\n", containerName)
ok = false
}
if licenseInfo.MaxLicenseIds == 0 {
fmt.Printf("Error in container config. Empty or 0 MaxLicenseIds for container %s\n", containerName)
Config.Logger.Printf("Error in container config. Empty or 0 MaxLicenseIds for container %s\n", containerName)
ok = false
}
if licenseInfo.G3autoName == "" {
fmt.Printf("Error in container config. Empty G3autoName for container %s\n", containerName)
Config.Logger.Printf("Error in container config. Empty G3autoName for container %s\n", containerName)
ok = false
}
if licenseInfo.G3autoKey == "" {
fmt.Printf("Error in container config. Empty G3autoKey for container %s\n", containerName)
Config.Logger.Printf("Error in container config. Empty G3autoKey for container %s\n", containerName)
ok = false
}
if licenseInfo.FilePath == "" {
fmt.Printf("Error in container config. Empty FilePath for container %s\n", containerName)
Config.Logger.Printf("Error in container config. Empty FilePath for container %s\n", containerName)
ok = false
}
if licenseInfo.WorkspaceFlavor == "" {
fmt.Printf("Error in container config. Empty WorkspaceFlavor for container %s\n", containerName)
Config.Logger.Printf("Error in container config. Empty WorkspaceFlavor for container %s\n", containerName)
ok = false
}
if ok {
Expand Down
2 changes: 1 addition & 1 deletion hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func terminate(w http.ResponseWriter, r *http.Request) {
// delete nextflow resources. There is no way to know if the actual workspace being
// terminated is a nextflow workspace or not, so always attempt to delete
Config.Logger.Printf("Info: Deleting Nextflow resources in AWS...")
err := cleanUpNextflowResources(userName)
err := cleanUpNextflowResources(userName, nil, nil, nil, nil)
if err != nil {
Config.Logger.Printf("Unable to delete AWS resources for Nextflow... continuing anyway")
}
Expand Down
5 changes: 3 additions & 2 deletions hatchery/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ func stringArrayContains(s []string, e string) bool {
}

func getLatestImageBuilderAmi(imageBuilderReaderRoleArn string, imagePipelineArn string, imagebuilderListImagePipelineImages func(*imagebuilder.ListImagePipelineImagesInput) (*imagebuilder.ListImagePipelineImagesOutput, error)) (string, error) {
// the `imagebuilderListImagePipelineImages` parameter should not be provided in production. It allows
// us to test this function by mocking `imagebuilder.ListImagePipelineImages` in the tests.
/* The `imagebuilderListImagePipelineImages` parameter should not be provided in production. It allows
us to test this function by mocking the AWS SDK in the tests.
*/
if imagebuilderListImagePipelineImages == nil {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
Expand Down
129 changes: 113 additions & 16 deletions hatchery/nextflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func createNextflowResources(userName string, nextflowGlobalConfig NextflowGloba
"Name": &tag,
}
tags := []*iam.Tag{
&iam.Tag{
{
Key: aws.String("Name"),
Value: &tag,
},
Expand Down Expand Up @@ -359,7 +359,7 @@ func createNextflowResources(userName string, nextflowGlobalConfig NextflowGloba

// delete any existing access keys to avoid `LimitExceeded: Cannot exceed
// quota for AccessKeysPerUser: 2` error
err = deleteUserAccessKeys(nextflowUserName, iamSvc)
err = deleteUserAccessKeys(nextflowUserName, iamSvc.ListAccessKeys, iamSvc.DeleteAccessKey)
if err != nil {
Config.Logger.Printf("Unable to delete access keys for user '%s': %v", nextflowUserName, err)
return "", "", err
Expand Down Expand Up @@ -400,7 +400,7 @@ func createNextflowResources(userName string, nextflowGlobalConfig NextflowGloba
return keyId, keySecret, nil
}

func getNextflowAwsSettings(sess *session.Session, payModel *PayModel, userName string, action string) (string, aws.Config, error) {
var getNextflowAwsSettings = func(sess *session.Session, payModel *PayModel, userName string, action string) (string, aws.Config, error) {
// credentials and AWS services init
var awsConfig aws.Config
var awsAccountId string
Expand Down Expand Up @@ -530,7 +530,6 @@ func setupVpcAndSquid(ec2Svc *ec2.EC2, userName string, hostname string) (*strin
// Function to make sure launch template is created, and configured correctly
// We need a launch template since we need a user data script to authenticate with private ECR repositories
func ensureLaunchTemplate(ec2Svc *ec2.EC2, userName string, hostname string, jobImageWhitelist []string) (*string, error) {

// user data script to authenticate with private ECR repositories
userData := generateEcrLoginUserData(jobImageWhitelist, userName)

Expand Down Expand Up @@ -1354,14 +1353,11 @@ func getLatestAmazonLinuxAmi(ec2svc *ec2.EC2) (*string, error) {
latestTimeStamp := time.Unix(0, 0).UTC()

for _, image := range ami.Images {

creationTimeStamp, _ := time.Parse(time.RFC3339, *image.CreationDate)

if creationTimeStamp.After(latestTimeStamp) {
latestTimeStamp = creationTimeStamp
latestImage = image
}

}

Config.Logger.Printf("Info: Found latest amazonlinux AMI: '%s'", *latestImage.ImageId)
Expand All @@ -1370,9 +1366,14 @@ func getLatestAmazonLinuxAmi(ec2svc *ec2.EC2) (*string, error) {
return nil, errors.New("no amazonlinux AMI found")
}

func getNextflowInstanceAmi(imageBuilderReaderRoleArn string, nextflowConfig NextflowConfig, imagebuilderListImagePipelineImages func(*imagebuilder.ListImagePipelineImagesInput) (*imagebuilder.ListImagePipelineImagesOutput, error)) (string, error) {
// the `imagebuilderListImagePipelineImages` parameter should not be provided in production. It allows
// us to test this function by mocking `imagebuilder.ListImagePipelineImages` in the tests.
func getNextflowInstanceAmi(
imageBuilderReaderRoleArn string,
nextflowConfig NextflowConfig,
imagebuilderListImagePipelineImages func(*imagebuilder.ListImagePipelineImagesInput) (*imagebuilder.ListImagePipelineImagesOutput, error),
) (string, error) {
/* The `imagebuilderListImagePipelineImages` parameter should not be provided in production. It allows
us to test this function by mocking the AWS SDK in the tests.
*/
var err error
ami := nextflowConfig.InstanceAmi
if ami != "" {
Expand All @@ -1388,8 +1389,72 @@ func getNextflowInstanceAmi(imageBuilderReaderRoleArn string, nextflowConfig Nex
return ami, err
}

func cancelBatchJobsInStatus(
batchJobQueueName string,
status string,
batchSvcListJobs func(*batch.ListJobsInput) (*batch.ListJobsOutput, error),
batchSvcTerminateJob func(*batch.TerminateJobInput) (*batch.TerminateJobOutput, error),
) error {
listJobsOutput, err := batchSvcListJobs(&batch.ListJobsInput{
JobQueue: &batchJobQueueName,
JobStatus: aws.String(status),
})
if err != nil {
Config.Logger.Printf("Error listing %s jobs in Batch queue '%s': %v", status, batchJobQueueName, err)
return err
}
runningJobs := listJobsOutput.JobSummaryList
for listJobsOutput.NextToken != nil { // if the result is paginated, get the rest
listJobsOutput, err = batchSvcListJobs(&batch.ListJobsInput{
JobQueue: &batchJobQueueName,
JobStatus: aws.String(status),
NextToken: listJobsOutput.NextToken,
})
if err != nil {
Config.Logger.Printf("Error listing %s jobs in Batch queue '%s': %v", status, batchJobQueueName, err)
return err
}
runningJobs = append(runningJobs, listJobsOutput.JobSummaryList...)
}
if len(runningJobs) == 0 {
Config.Logger.Printf("Debug: No %s jobs to cancel", status)
}

// `TerminateJob` cancels jobs in SUBMITTED, PENDING or RUNNABLE state and terminates jobs
// in STARTING or RUNNING state
for _, job := range runningJobs {
Config.Logger.Printf("Canceling %s job: ID '%s', name '%s'", status, *job.JobId, *job.JobName)
_, err := batchSvcTerminateJob(&batch.TerminateJobInput{
JobId: job.JobId,
Reason: aws.String("User's workspace was terminated"),
})
if err != nil {
Config.Logger.Printf("Error terminating job '%s': %v", *job.JobId, err)
return err
}
}

return nil
}

// delete the AWS resources created to launch nextflow workflows
func cleanUpNextflowResources(userName string) error {
func cleanUpNextflowResources(
userName string,
iamSvcListAccessKeys func(*iam.ListAccessKeysInput) (*iam.ListAccessKeysOutput, error),
iamSvcDeleteAccessKey func(*iam.DeleteAccessKeyInput) (*iam.DeleteAccessKeyOutput, error),
batchSvcListJobs func(*batch.ListJobsInput) (*batch.ListJobsOutput, error),
batchSvcTerminateJob func(*batch.TerminateJobInput) (*batch.TerminateJobOutput, error),
) error {
/* Clean up Nextflow resource when a workspace is terminated:
- delete IAM user access keys
- stop Squid instance
- (S3 bucket cleanup is disabled)
- cancel any running Batch jobs
The `iamSvcListAccessKeys`, `iamSvcDeleteAccessKey`, `batchSvcListJobs` and `batchSvcTerminateJob`
parameters should not be provided in production. They allow us to test this function by mocking
the AWS SDK in the tests.
*/
payModel, err := getCurrentPayModel(userName)
if err != nil {
return err
Expand All @@ -1406,13 +1471,20 @@ func cleanUpNextflowResources(userName string) error {
Config.Logger.Printf("Debug: AWS account ID: '%v'", awsAccountId)
iamSvc := iam.New(sess, &awsConfig)
ec2Svc := ec2.New(sess, &awsConfig)
batchSvc := batch.New(sess, &awsConfig)

userName = escapism(userName)
hostname := strings.ReplaceAll(os.Getenv("GEN3_ENDPOINT"), ".", "-")

// delete the user's access keys
nextflowUserName := fmt.Sprintf("%s-nf-%s", hostname, userName)
err = deleteUserAccessKeys(nextflowUserName, iamSvc)
if iamSvcListAccessKeys == nil {
iamSvcListAccessKeys = iamSvc.ListAccessKeys
}
if iamSvcDeleteAccessKey == nil {
iamSvcDeleteAccessKey = iamSvc.DeleteAccessKey
}
err = deleteUserAccessKeys(nextflowUserName, iamSvcListAccessKeys, iamSvcDeleteAccessKey)
if err != nil {
Config.Logger.Printf("Unable to delete access keys for user '%s': %v", nextflowUserName, err)
return err
Expand Down Expand Up @@ -1444,11 +1516,36 @@ func cleanUpNextflowResources(userName string) error {
// Config.Logger.Printf("Debug: Deleted objects in bucket '%s' at '%s'", bucketName, objectsKey)
// }

// cancel any Batch jobs that are still running (or about to run) for this user
batchJobQueueName := fmt.Sprintf("%s-nf-job-queue-%s", hostname, userName)
if batchSvcListJobs == nil {
batchSvcListJobs = batchSvc.ListJobs
}
if batchSvcTerminateJob == nil {
batchSvcTerminateJob = batchSvc.TerminateJob
}
// First cancel jobs that are already incurring cost ("running" status). Then cancel in the order a job goes
// through statuses (first submitted, then pending, etc) to ensure all jobs are deleted. Finally cancel any
// jobs that reached the "running" status in the meantime. Ignore jobs in "succeeded" or "failed" status.
statusToCancel := []string{batch.JobStatusRunning, batch.JobStatusSubmitted, batch.JobStatusPending, batch.JobStatusRunnable, batch.JobStatusStarting, batch.JobStatusRunning}
Config.Logger.Printf("Canceling user's jobs in Batch queue '%s'...", batchJobQueueName)
for _, status := range statusToCancel {
err = cancelBatchJobsInStatus(batchJobQueueName, status, batchSvcListJobs, batchSvcTerminateJob)
if err != nil {
Config.Logger.Printf("Error canceling user's Batch jobs: %v", err)
return err
}
}

return nil
}

func deleteUserAccessKeys(nextflowUserName string, iamSvc *iam.IAM) error {
listAccessKeysResult, err := iamSvc.ListAccessKeys(&iam.ListAccessKeysInput{
func deleteUserAccessKeys(
nextflowUserName string,
iamSvcListAccessKeys func(*iam.ListAccessKeysInput) (*iam.ListAccessKeysOutput, error),
iamSvcDeleteAccessKey func(*iam.DeleteAccessKeyInput) (*iam.DeleteAccessKeyOutput, error),
) error {
listAccessKeysResult, err := iamSvcListAccessKeys(&iam.ListAccessKeysInput{
UserName: &nextflowUserName,
})
if err != nil {
Expand All @@ -1457,7 +1554,7 @@ func deleteUserAccessKeys(nextflowUserName string, iamSvc *iam.IAM) error {
}
for _, key := range listAccessKeysResult.AccessKeyMetadata {
Config.Logger.Printf("Deleting access key '%s' for user '%s'", *key.AccessKeyId, nextflowUserName)
_, err := iamSvc.DeleteAccessKey(&iam.DeleteAccessKeyInput{
_, err := iamSvcDeleteAccessKey(&iam.DeleteAccessKeyInput{
UserName: &nextflowUserName,
AccessKeyId: key.AccessKeyId,
})
Expand All @@ -1469,7 +1566,7 @@ func deleteUserAccessKeys(nextflowUserName string, iamSvc *iam.IAM) error {
return nil
}

func stopSquidInstance(hostname string, userName string, ec2svc *ec2.EC2) error {
var stopSquidInstance = func(hostname string, userName string, ec2svc *ec2.EC2) error {
// check if instance already exists, if it does stop it and return
exinstance, err := ec2svc.DescribeInstances(&ec2.DescribeInstancesInput{
Filters: []*ec2.Filter{
Expand Down
Loading

0 comments on commit 4466544

Please sign in to comment.