Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MULTI-321: Refactor logic to be cleaner and remove unnecessary config data #43

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 72 additions & 115 deletions src/adapters/ingresses/apig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,42 @@ impl AwsApiGateway {
})
}

pub async fn upload_lambda(&self, lambda_name: &str) -> Result<String> {
/// Helper function to convert an API Gateway's name to its auto-generated AWS ID
pub async fn get_api_id_by_name(&self, api_name: &str) -> Result<RestApi> {
let all_apis = self
.apig_client
.get_rest_apis()
.send()
.await
.into_diagnostic()?;

let api = all_apis
.items()
.iter()
.find(|api| api.name().unwrap() == api_name)
.ok_or(miette!(
"Could not find an API Gateway with the name: {}",
api_name
))?;

Ok(api.clone())
}
}

/// Given a path to a file, load it as an array of bytes.
async fn read_file(artifact_path: PathBuf) -> Result<Vec<u8>> {
let mut bytes = Vec::new();
// Load the lambda from file.
let mut artifact = File::open(artifact_path).await.into_diagnostic()?;
artifact.read_to_end(&mut bytes).await.into_diagnostic()?;
Ok(bytes)
}

#[async_trait]
impl Ingress for AwsApiGateway {
async fn deploy(&mut self) -> Result<()> {
// First, we need to deploy the new version of the lambda

// Parse the bytes into the format AWS wants
let code = Blob::from(self.lambda_artifact.clone());

Expand All @@ -67,35 +102,29 @@ impl AwsApiGateway {
let res = self
.lambda_client
.update_function_code()
.function_name(lambda_name)
.function_name(&self.lambda_name)
.zip_file(zip_file.clone())
.send()
.await
.into_diagnostic()?;

let lambda_arn = res
.function_arn()
.ok_or(miette!("Couldn't get ARN of deployed lambda"))?;

let version = res
.version()
.ok_or(miette!("Couldn't get version of deployed lambda"))?;

Ok(version.to_string())
}
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id().ok_or(miette!("Couldn't get ID of deployed API"))?;

pub async fn create_apig_deployment(
&self,
api_id: &str,
stage_name: &str,
lambda_name: &str,
lambda_version: &str,
traffic_percentage: WholePercent,
) -> Result<()> {
// Update the APIG with the new lambda version
// Next, we need to create a new deployment, pointing at our
// new lambda version with canary settings
self.apig_client
.put_integration()
.rest_api_id(api_id)
.uri(format!(
"arn:aws:lambda:us-east-2:471112630982:function:{}:{}",
lambda_name, lambda_version
))
.uri(format!("{}:{}", lambda_arn, version))
.send()
.await
.into_diagnostic()?;
Expand All @@ -104,10 +133,12 @@ impl AwsApiGateway {
self.apig_client
.create_deployment()
.rest_api_id(api_id)
.stage_name(stage_name)
.stage_name(&self.stage_name)
.canary_settings(
DeploymentCanarySettings::builder()
.percent_traffic(traffic_percentage.into_inner() as f64)
// This is set to 0 explicitly here since the first step of the pipeline
// is to increase traffic
.percent_traffic(0.0)
.build(),
)
.send()
Expand All @@ -117,45 +148,20 @@ impl AwsApiGateway {
Ok(())
}

pub async fn get_api_id_by_name(&self, api_name: &str) -> Result<RestApi> {
// Given an API Gateway's name, return its auto-generated AWS ID
let all_apis = self
.apig_client
.get_rest_apis()
.send()
.await
.into_diagnostic()?;

let api = all_apis
.items()
.iter()
.find(|api| api.name.clone().unwrap() == api_name)
.ok_or(miette!(
"Could not find an API Gateway with the name: {}",
api_name
))?;

Ok(api.clone())
}

pub async fn update_canary_traffic(
&self,
api_name: &str,
stage_name: &str,
traffic_percentage: WholePercent,
) -> Result<()> {
let api = self.get_api_id_by_name(api_name).await?;
async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> {
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id().ok_or(miette!("Couldn't get ID of deployed API"))?;

let patch_op = PatchOperation::builder()
.op(Op::Replace)
.path("/canarySettings/percentTraffic")
.value(traffic_percentage.to_string())
.value(percent.to_string())
.build();

self.apig_client
.update_stage()
.rest_api_id(api.id.unwrap_or_default())
.stage_name(stage_name)
.rest_api_id(api_id)
.stage_name(&self.stage_name)
.patch_operations(patch_op)
.send()
.await
Expand All @@ -164,8 +170,9 @@ impl AwsApiGateway {
Ok(())
}

async fn delete_canary(&self, api_name: &str, stage_name: &str) -> Result<()> {
let api = self.get_api_id_by_name(api_name).await?;
async fn rollback_canary(&mut self) -> Result<()> {
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id().ok_or(miette!("Couldn't get ID of deployed API"))?;

// Updates the stage to delete any canary settings from the API Gateway
let patch_op = PatchOperation::builder()
Expand All @@ -175,8 +182,8 @@ impl AwsApiGateway {

self.apig_client
.update_stage()
.rest_api_id(api.id.unwrap_or_default())
.stage_name(stage_name)
.rest_api_id(api_id)
.stage_name(&self.stage_name)
.patch_operations(patch_op)
.send()
.await
Expand All @@ -185,8 +192,9 @@ impl AwsApiGateway {
Ok(())
}

pub async fn promote_apig_canary(&self, api_name: &str, stage_name: &str) -> Result<()> {
let api = self.get_api_id_by_name(api_name).await?;
async fn promote_canary(&mut self) -> Result<()> {
let api = self.get_api_id_by_name(&self.gateway_name).await?;
let api_id = api.id().ok_or(miette!("Couldn't get ID of deployed API"))?;

// Overwrite the main deployment's ID with the canary's
let replace_deployment_op = PatchOperation::builder()
Expand All @@ -195,75 +203,24 @@ impl AwsApiGateway {
.path("/deploymentId")
.build();

// Reset canary traffic to 0% so we're ready for another release
let reset_traffic_op = PatchOperation::builder()
.op(Op::Replace)
.path("/canarySettings/percentTraffic")
// Note: this must be a string to pass into Value, but it's actually an f64 in AWS
.value("0.0")
// Deletes all canary settings from the API Gateway so we're ready for the next
// canary deployment
let delete_canary_op = PatchOperation::builder()
.op(Op::Remove)
.path("/canarySettings")
.build();

// Send request to update stage
self.apig_client
.update_stage()
.rest_api_id(api.id.unwrap_or_default())
.stage_name(stage_name)
.rest_api_id(api_id)
.stage_name(&self.stage_name)
.patch_operations(replace_deployment_op)
.patch_operations(reset_traffic_op)
.patch_operations(delete_canary_op)
.send()
.await
.into_diagnostic()?;

Ok(())
}
}

/// given a path to a file, load it as an array of bytes.
async fn read_file(artifact_path: PathBuf) -> Result<Vec<u8>> {
let mut bytes = Vec::new();
// Load the lambda from file.
let mut artifact = File::open(artifact_path).await.into_diagnostic()?;
artifact.read_to_end(&mut bytes).await.into_diagnostic()?;
Ok(bytes)
}

#[async_trait]
impl Ingress for AwsApiGateway {
async fn deploy(&mut self) -> Result<()> {
// First, we need to deploy the new version of the lambda
let lambda_version = self.upload_lambda(&self.lambda_name).await?;

// Next, we need to create a new deployment, pointing at our
// new lambda version with canary settings
self.create_apig_deployment(
&self.gateway_name,
&self.stage_name,
&self.lambda_name,
&lambda_version,
WholePercent::try_new(0).into_diagnostic()?,
)
.await?;

Ok(())
}

async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> {
self.update_canary_traffic(&self.gateway_name, &self.stage_name, percent)
.await?;

Ok(())
}

async fn rollback_canary(&mut self) -> Result<()> {
self.delete_canary(&self.gateway_name, &self.stage_name)
.await?;

Ok(())
}

async fn promote_canary(&mut self) -> Result<()> {
self.promote_apig_canary("Releases", "prod").await?;

Ok(())
}
}