diff --git a/src/adapters/ingresses/apig.rs b/src/adapters/ingresses/apig.rs index 2270ca4..e59db8b 100644 --- a/src/adapters/ingresses/apig.rs +++ b/src/adapters/ingresses/apig.rs @@ -53,7 +53,42 @@ impl AwsApiGateway { }) } - pub async fn upload_lambda(&self, lambda_name: &str) -> Result { + /// Helper function to convert an API Gateway's name to its auto-generated AWS ID + pub async fn get_api_id_by_name(&self, api_name: &str) -> Result { + let all_apis = self + .apig_client + .get_rest_apis() + .send() + .await + .into_diagnostic()?; + + let api = all_apis + .items() + .iter() + .find(|api| api.name.clone().unwrap() == api_name) + .ok_or(miette!( + "Could not find an API Gateway with the name: {}", + api_name + ))?; + + Ok(api.clone()) + } +} + +/// Given a path to a file, load it as an array of bytes. +async fn read_file(artifact_path: PathBuf) -> Result> { + let mut bytes = Vec::new(); + // Load the lambda from file. + let mut artifact = File::open(artifact_path).await.into_diagnostic()?; + artifact.read_to_end(&mut bytes).await.into_diagnostic()?; + Ok(bytes) +} + +#[async_trait] +impl Ingress for AwsApiGateway { + async fn deploy(&mut self) -> Result<()> { + // First, we need to deploy the new version of the lambda + // Parse the bytes into the format AWS wants let code = Blob::from(self.lambda_artifact.clone()); @@ -67,35 +102,29 @@ impl AwsApiGateway { let res = self .lambda_client .update_function_code() - .function_name(lambda_name) + .function_name(&self.lambda_name) .zip_file(zip_file.clone()) .send() .await .into_diagnostic()?; + let lambda_arn = res + .function_arn() + .ok_or(miette!("Couldn't get ARN of deployed lambda"))?; + let version = res .version() .ok_or(miette!("Couldn't get version of deployed lambda"))?; - Ok(version.to_string()) - } + let api = self.get_api_id_by_name(&self.gateway_name).await?; + let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?; - pub async fn create_apig_deployment( - &self, - api_id: &str, - stage_name: &str, - lambda_name: &str, - lambda_version: &str, - traffic_percentage: WholePercent, - ) -> Result<()> { - // Update the APIG with the new lambda version + // Next, we need to create a new deployment, pointing at our + // new lambda version with canary settings self.apig_client .put_integration() - .rest_api_id(api_id) - .uri(format!( - "arn:aws:lambda:us-east-2:471112630982:function:{}:{}", - lambda_name, lambda_version - )) + .rest_api_id(&api_id) + .uri(format!("{}:{}", lambda_arn, version)) .send() .await .into_diagnostic()?; @@ -103,11 +132,13 @@ impl AwsApiGateway { // Create a deployment with canary settings to deploy our new lambda self.apig_client .create_deployment() - .rest_api_id(api_id) - .stage_name(stage_name) + .rest_api_id(&api_id) + .stage_name(&self.stage_name) .canary_settings( DeploymentCanarySettings::builder() - .percent_traffic(traffic_percentage.into_inner() as f64) + // This is set to 0 explicitly here since the first step of the pipeline + // is to increase traffic + .percent_traffic(0.0) .build(), ) .send() @@ -117,45 +148,20 @@ impl AwsApiGateway { Ok(()) } - pub async fn get_api_id_by_name(&self, api_name: &str) -> Result { - // Given an API Gateway's name, return its auto-generated AWS ID - let all_apis = self - .apig_client - .get_rest_apis() - .send() - .await - .into_diagnostic()?; - - let api = all_apis - .items() - .iter() - .find(|api| api.name.clone().unwrap() == api_name) - .ok_or(miette!( - "Could not find an API Gateway with the name: {}", - api_name - ))?; - - Ok(api.clone()) - } - - pub async fn update_canary_traffic( - &self, - api_name: &str, - stage_name: &str, - traffic_percentage: WholePercent, - ) -> Result<()> { - let api = self.get_api_id_by_name(api_name).await?; + async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> { + let api = self.get_api_id_by_name(&self.gateway_name).await?; + let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?; let patch_op = PatchOperation::builder() .op(Op::Replace) .path("/canarySettings/percentTraffic") - .value(traffic_percentage.to_string()) + .value(percent.to_string()) .build(); self.apig_client .update_stage() - .rest_api_id(api.id.unwrap_or_default()) - .stage_name(stage_name) + .rest_api_id(api_id) + .stage_name(&self.stage_name) .patch_operations(patch_op) .send() .await @@ -164,8 +170,9 @@ impl AwsApiGateway { Ok(()) } - async fn delete_canary(&self, api_name: &str, stage_name: &str) -> Result<()> { - let api = self.get_api_id_by_name(api_name).await?; + async fn rollback_canary(&mut self) -> Result<()> { + let api = self.get_api_id_by_name(&self.gateway_name).await?; + let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?; // Updates the stage to delete any canary settings from the API Gateway let patch_op = PatchOperation::builder() @@ -175,8 +182,8 @@ impl AwsApiGateway { self.apig_client .update_stage() - .rest_api_id(api.id.unwrap_or_default()) - .stage_name(stage_name) + .rest_api_id(api_id) + .stage_name(&self.stage_name) .patch_operations(patch_op) .send() .await @@ -185,8 +192,9 @@ impl AwsApiGateway { Ok(()) } - pub async fn promote_apig_canary(&self, api_name: &str, stage_name: &str) -> Result<()> { - let api = self.get_api_id_by_name(api_name).await?; + async fn promote_canary(&mut self) -> Result<()> { + let api = self.get_api_id_by_name(&self.gateway_name).await?; + let api_id = api.id.ok_or(miette!("Couldn't get ID of deployed API"))?; // Overwrite the main deployment's ID with the canary's let replace_deployment_op = PatchOperation::builder() @@ -195,21 +203,20 @@ impl AwsApiGateway { .path("/deploymentId") .build(); - // Reset canary traffic to 0% so we're ready for another release - let reset_traffic_op = PatchOperation::builder() - .op(Op::Replace) - .path("/canarySettings/percentTraffic") - // Note: this must be a string to pass into Value, but it's actually an f64 in AWS - .value("0.0") + // Deletes all canary settings from the API Gateway so we're ready for the next + // canary deployment + let delete_canary_op = PatchOperation::builder() + .op(Op::Remove) + .path("/canarySettings") .build(); // Send request to update stage self.apig_client .update_stage() - .rest_api_id(api.id.unwrap_or_default()) - .stage_name(stage_name) + .rest_api_id(api_id) + .stage_name(&self.stage_name) .patch_operations(replace_deployment_op) - .patch_operations(reset_traffic_op) + .patch_operations(delete_canary_op) .send() .await .into_diagnostic()?; @@ -217,53 +224,3 @@ impl AwsApiGateway { Ok(()) } } - -/// given a path to a file, load it as an array of bytes. -async fn read_file(artifact_path: PathBuf) -> Result> { - let mut bytes = Vec::new(); - // Load the lambda from file. - let mut artifact = File::open(artifact_path).await.into_diagnostic()?; - artifact.read_to_end(&mut bytes).await.into_diagnostic()?; - Ok(bytes) -} - -#[async_trait] -impl Ingress for AwsApiGateway { - async fn deploy(&mut self) -> Result<()> { - // First, we need to deploy the new version of the lambda - let lambda_version = self.upload_lambda(&self.lambda_name).await?; - - // Next, we need to create a new deployment, pointing at our - // new lambda version with canary settings - self.create_apig_deployment( - &self.gateway_name, - &self.stage_name, - &self.lambda_name, - &lambda_version, - WholePercent::try_new(0).into_diagnostic()?, - ) - .await?; - - Ok(()) - } - - async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> { - self.update_canary_traffic(&self.gateway_name, &self.stage_name, percent) - .await?; - - Ok(()) - } - - async fn rollback_canary(&mut self) -> Result<()> { - self.delete_canary(&self.gateway_name, &self.stage_name) - .await?; - - Ok(()) - } - - async fn promote_canary(&mut self) -> Result<()> { - self.promote_apig_canary("Releases", "prod").await?; - - Ok(()) - } -}