diff --git a/momento-cli-opts/src/lib.rs b/momento-cli-opts/src/lib.rs index 7b63c15..8c42b3a 100644 --- a/momento-cli-opts/src/lib.rs +++ b/momento-cli-opts/src/lib.rs @@ -229,6 +229,11 @@ to help find opportunities for optimizations with Momento. help = "Opt in to check whether ddb tables have ttl enabled. If there are lots of tables, could slow down data collection" )] enable_ddb_ttl_check: bool, + #[arg( + long = "enable-gsi", + help = "Opt in to check metrics on dynamodb gsi's. If there are lots of tables with gsi's, could slow down data collection" + )] + enable_gsi: bool, #[arg( value_enum, long = "resource", @@ -238,7 +243,7 @@ to help find opportunities for optimizations with Momento. #[arg( long = "metric-collection-rate", help = "tps at which to invoke the aws `get-metric-data` api", - default_value = "20" + default_value = "10" )] metric_collection_rate: u32, }, diff --git a/momento/src/commands/cloud_linter/api_gateway.rs b/momento/src/commands/cloud_linter/api_gateway.rs index 0bcdecd..4931581 100644 --- a/momento/src/commands/cloud_linter/api_gateway.rs +++ b/momento/src/commands/cloud_linter/api_gateway.rs @@ -125,8 +125,9 @@ async fn process_apis( let mut resources: Vec = Vec::with_capacity(apis.len()); let get_apis_bar = ProgressBar::new((apis.len() * 2) as u64).with_message("Processing API Gateway resources"); - get_apis_bar - .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); + get_apis_bar.set_style( + ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"), + ); for api in apis { let the_api = apig_client .get_rest_api() diff --git a/momento/src/commands/cloud_linter/dynamodb.rs b/momento/src/commands/cloud_linter/dynamodb.rs index e9a355a..5a6c923 100644 --- a/momento/src/commands/cloud_linter/dynamodb.rs +++ b/momento/src/commands/cloud_linter/dynamodb.rs @@ -18,7 +18,7 @@ use crate::commands::cloud_linter::resource::{DynamoDbResource, Resource, Resour use crate::commands::cloud_linter::utils::rate_limit; use crate::error::CliError; -const DDB_TABLE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { +const DDB_TABLE_PROVISIONED_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { "Sum" => &[ "ConsumedReadCapacityUnits", "ConsumedWriteCapacityUnits", @@ -27,8 +27,6 @@ const DDB_TABLE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { "ReadThrottleEvents", "WriteThrottleEvents", "TimeToLiveDeletedItemCount", - "TransactionConflict", - "ConditionalCheckFailedRequests", ], "Average" => &[ "ConsumedReadCapacityUnits", @@ -46,6 +44,26 @@ const DDB_TABLE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { ], }; +const DDB_TABLE_PAY_PER_USE_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { + "Sum" => &[ + "ConsumedReadCapacityUnits", + "ConsumedWriteCapacityUnits", + "ReadThrottleEvents", + "WriteThrottleEvents", + "TimeToLiveDeletedItemCount", + ], + "Average" => &[ + "ConsumedReadCapacityUnits", + "ConsumedWriteCapacityUnits", + ], + "Maximum" => &[ + "ConsumedReadCapacityUnits", + "ConsumedWriteCapacityUnits", + "ReadThrottleEvents", + "WriteThrottleEvents", + ], +}; + const DDB_GSI_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! { "Sum" => &[ "ConsumedReadCapacityUnits", @@ -132,12 +150,28 @@ pub(crate) struct GsiMetadata { impl ResourceWithMetrics for DynamoDbResource { fn create_metric_targets(&self) -> Result, CliError> { match self.resource_type { - ResourceType::DynamoDbTable => Ok(vec![MetricTarget { - namespace: "AWS/DynamoDB".to_string(), - expression: "".to_string(), - dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]), - targets: DDB_TABLE_METRICS, - }]), + ResourceType::DynamoDbTable => { + if self + .metadata + .billing_mode + .clone() + .unwrap_or_default() + .eq("PAY_PER_REQUEST") + { + return Ok(vec![MetricTarget { + namespace: "AWS/DynamoDB".to_string(), + expression: "".to_string(), + dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]), + targets: DDB_TABLE_PAY_PER_USE_METRICS, + }]); + } + Ok(vec![MetricTarget { + namespace: "AWS/DynamoDB".to_string(), + expression: "".to_string(), + dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]), + targets: DDB_TABLE_PROVISIONED_METRICS, + }]) + } ResourceType::DynamoDbGsi => { let gsi_name = self .metadata @@ -179,6 +213,7 @@ pub(crate) async fn process_ddb_resources( describe_ttl_limiter: Arc, sender: Sender, enable_ddb_ttl_check: bool, + enable_gsi: bool, ) -> Result<(), CliError> { let ddb_client = aws_sdk_dynamodb::Client::new(config); let metrics_client = aws_sdk_cloudwatch::Client::new(config); @@ -190,47 +225,56 @@ pub(crate) async fn process_ddb_resources( let describe_ddb_tables_bar = ProgressBar::new(table_names.len() as u64).with_message("Processing Dynamo DB tables"); - describe_ddb_tables_bar - .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); - - let futures = FuturesUnordered::new(); - - for table_name in table_names { - let sender_clone = sender.clone(); - let ddb_client_clone = ddb_client.clone(); - let metrics_client_clone = metrics_client.clone(); - let table_name_clone = table_name.clone(); - let control_plane_limiter_clone = control_plane_limiter.clone(); - let metrics_limiter_clone = metrics_limiter.clone(); - let describe_ttl_limiter_clone = describe_ttl_limiter.clone(); - let progress_bar_clone = describe_ddb_tables_bar.clone(); - let spawn = tokio::spawn(async move { - let res = process_table_resources( - &ddb_client_clone, - &metrics_client_clone, - &table_name_clone, - control_plane_limiter_clone, - metrics_limiter_clone, - describe_ttl_limiter_clone, - sender_clone, - enable_ddb_ttl_check, - ) - .await; - progress_bar_clone.inc(1); - res - }); - futures.push(spawn); - } + describe_ddb_tables_bar.set_style( + ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"), + ); - let all_results = futures::future::join_all(futures).await; - for result in all_results { - match result { - // bubble up any cli errors that we came across - Ok(res) => res?, - Err(_) => { - return Err(CliError { - msg: "failed to wait for all dynamo resources to collect data".to_string(), - }) + // we don't want to spawn 1 million processes at the same time. We chunk the tables into chunks of 10, complete 10 + // at a time, describing tables as well as getting all table metrics, and then move on to the next 10 + let table_chunks = table_names.chunks(10); + let vec_tables: Vec<&[String]> = table_chunks.into_iter().collect(); + + for table_batch in vec_tables { + let futures = FuturesUnordered::new(); + for table_name in table_batch { + let sender_clone = sender.clone(); + let ddb_client_clone = ddb_client.clone(); + let metrics_client_clone = metrics_client.clone(); + let table_name_clone = table_name.clone(); + let control_plane_limiter_clone = Arc::clone(&control_plane_limiter); + let metrics_limiter_clone = Arc::clone(&metrics_limiter); + let describe_ttl_limiter_clone = Arc::clone(&describe_ttl_limiter); + let progress_bar_clone = describe_ddb_tables_bar.clone(); + let spawn = tokio::spawn(async move { + let res = process_table_resources( + &ddb_client_clone, + &metrics_client_clone, + &table_name_clone, + control_plane_limiter_clone, + metrics_limiter_clone, + describe_ttl_limiter_clone, + sender_clone, + enable_ddb_ttl_check, + enable_gsi, + ) + .await; + progress_bar_clone.inc(1); + res + }); + futures.push(spawn); + } + + let all_results = futures::future::join_all(futures).await; + for result in all_results { + match result { + // bubble up any cli errors that we came across + Ok(res) => res?, + Err(_) => { + println!("failed to wait for all dynamodb tables"); + return Err(CliError { + msg: "failed to wait for all dynamo resources to collect data".to_string(), + }); + } } } } @@ -323,6 +367,7 @@ async fn process_table_resources( describe_ttl_limiter: Arc, sender: Sender, enable_ddb_ttl_check: bool, + enable_gsi: bool, ) -> Result<(), CliError> { let region = ddb_client .config() @@ -481,6 +526,16 @@ async fn process_table_resources( }); for mut resource in resources { + // if we have disabled collecting gsi metrics, then forward the gsi to the sender and continue + if resource.resource_type == ResourceType::DynamoDbGsi && !enable_gsi { + sender + .send(Resource::DynamoDb(resource)) + .await + .map_err(|err| CliError { + msg: format!("Failed to stream dynamodb resource to file: {}", err), + })?; + continue; + } resource .append_metrics(metrics_client, Arc::clone(&metrics_limiter)) .await?; diff --git a/momento/src/commands/cloud_linter/linter_cli.rs b/momento/src/commands/cloud_linter/linter_cli.rs index 69518c4..a785ef6 100644 --- a/momento/src/commands/cloud_linter/linter_cli.rs +++ b/momento/src/commands/cloud_linter/linter_cli.rs @@ -1,8 +1,10 @@ use std::io::{copy, BufReader}; use std::path::Path; use std::sync::Arc; +use std::time::Duration; use crate::commands::cloud_linter::api_gateway::process_api_gateway_resources; +use aws_config::retry::RetryConfig; use aws_config::{BehaviorVersion, Region}; use flate2::write::GzEncoder; use flate2::Compression; @@ -24,6 +26,7 @@ use super::resource::Resource; pub async fn run_cloud_linter( region: String, enable_ddb_ttl_check: bool, + enable_gsi: bool, only_collect_for_resource: Option, metric_collection_rate: u32, ) -> Result<(), CliError> { @@ -44,6 +47,7 @@ pub async fn run_cloud_linter( region, tx, enable_ddb_ttl_check, + enable_gsi, only_collect_for_resource, metric_collection_rate, ) @@ -73,11 +77,17 @@ async fn process_data( region: String, sender: Sender, enable_ddb_ttl_check: bool, + enable_gsi: bool, only_collect_for_resource: Option, metric_collection_rate: u32, ) -> Result<(), CliError> { + let retry_config = RetryConfig::adaptive() + .with_initial_backoff(Duration::from_secs(1)) + .with_max_attempts(20) + .with_max_backoff(Duration::from_secs(15)); let config = aws_config::defaults(BehaviorVersion::latest()) .region(Region::new(region)) + .retry_config(retry_config) .load() .await; check_aws_credentials(&config).await?; @@ -88,7 +98,7 @@ async fn process_data( let control_plane_limiter = Arc::new(RateLimiter::direct(control_plane_quota)); let describe_ttl_quota = Quota::per_second( - core::num::NonZeroU32::new(3).expect("should create non-zero describe_ttl_quota"), + core::num::NonZeroU32::new(1).expect("should create non-zero describe_ttl_quota"), ); let describe_ttl_limiter = Arc::new(RateLimiter::direct(describe_ttl_quota)); @@ -127,6 +137,7 @@ async fn process_data( Arc::clone(&describe_ttl_limiter), sender.clone(), enable_ddb_ttl_check, + enable_gsi, ) .await?; return Ok(()); @@ -175,6 +186,7 @@ async fn process_data( Arc::clone(&describe_ttl_limiter), sender.clone(), enable_ddb_ttl_check, + enable_gsi, ) .await?; diff --git a/momento/src/commands/cloud_linter/metrics.rs b/momento/src/commands/cloud_linter/metrics.rs index e44771b..006afc1 100644 --- a/momento/src/commands/cloud_linter/metrics.rs +++ b/momento/src/commands/cloud_linter/metrics.rs @@ -45,6 +45,29 @@ pub(crate) trait AppendMetrics { ) -> Result<(), CliError>; } +// impl AppendMetrics for T +// where +// T: ResourceWithMetrics, +// { +// async fn append_metrics( +// &mut self, +// metrics_client: &Client, +// limiter: Arc, +// ) -> Result<(), CliError> { +// let metric_targets = self.create_metric_targets()?; +// let mut metrics: Vec> = Vec::new(); +// for target in metric_targets { +// metrics.push( +// query_metrics_for_target(metrics_client, Arc::clone(&limiter), target).await?, +// ); +// } +// self.set_metrics(metrics.into_iter().flatten().collect()); +// self.set_metric_period_seconds(60 * 60 * 24); + +// Ok(()) +// } +// } + impl AppendMetrics for T where T: ResourceWithMetrics, @@ -153,7 +176,15 @@ async fn query_metrics_for_target( .send(); while let Some(result) = rate_limit(Arc::clone(&limiter), || metric_stream.next()).await { - let result = result?; + let result = match result { + Ok(res) => res, + Err(e) => { + println!("get_metric_data_error: {:?}", e); + return Err(CliError { + msg: "error from aws api while querying metrics".to_string(), + }); + } + }; if let Some(mdr_vec) = result.metric_data_results { for mdr in mdr_vec { let name = mdr.id.ok_or_else(|| CliError { diff --git a/momento/src/commands/cloud_linter/s3.rs b/momento/src/commands/cloud_linter/s3.rs index 0499626..296e993 100644 --- a/momento/src/commands/cloud_linter/s3.rs +++ b/momento/src/commands/cloud_linter/s3.rs @@ -244,8 +244,10 @@ async fn process_buckets( ) -> Result<(), CliError> { let process_buckets_bar = ProgressBar::new((buckets.len() * 2) as u64).with_message("Processing S3 Buckets"); - process_buckets_bar - .set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template")); + process_buckets_bar.set_style( + ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"), + ); + let futures = FuturesUnordered::new(); for bucket in buckets { let s3_client_clone = s3client.clone(); diff --git a/momento/src/main.rs b/momento/src/main.rs index 8b84b57..3b20cea 100644 --- a/momento/src/main.rs +++ b/momento/src/main.rs @@ -268,10 +268,12 @@ async fn run_momento_command(args: momento_cli_opts::Momento) -> Result<(), CliE enable_ddb_ttl_check, resource, metric_collection_rate, + enable_gsi, } => { commands::cloud_linter::linter_cli::run_cloud_linter( region, enable_ddb_ttl_check, + enable_gsi, resource, metric_collection_rate, )